Skip to content

Commit ef61a64

Browse files
kylebrandtryantxu
andauthored
Azure Monitor: Log Analytics response to data frames (grafana#25297)
Co-authored-by: Ryan McKinley <[email protected]>
1 parent c3549f8 commit ef61a64

13 files changed

+582
-661
lines changed

packages/grafana-runtime/src/utils/DataSourceWithBackend.ts

+21-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
DataQuery,
77
DataSourceJsonData,
88
ScopedVars,
9+
DataFrame,
910
} from '@grafana/data';
1011
import { Observable, from, of } from 'rxjs';
1112
import { config } from '..';
@@ -109,16 +110,34 @@ export class DataSourceWithBackend<
109110
requestId,
110111
})
111112
.then((rsp: any) => {
112-
return toDataQueryResponse(rsp);
113+
const dqs = toDataQueryResponse(rsp);
114+
if (this.processResponse) {
115+
return this.processResponse(dqs);
116+
}
117+
return dqs;
113118
})
114119
.catch(err => {
115120
err.isHandled = true; // Avoid extra popup warning
116-
return toDataQueryResponse(err);
121+
const dqs = toDataQueryResponse(err);
122+
if (this.processResponse) {
123+
return this.processResponse(dqs);
124+
}
125+
return dqs;
117126
});
118127

119128
return from(req);
120129
}
121130

131+
/**
132+
* Optionally augment the response before returning the results to the
133+
*/
134+
processResponse?(res: DataQueryResponse): Promise<DataQueryResponse>;
135+
136+
/**
137+
* Optionally process the results for display
138+
*/
139+
processDataFrameResult?(frame: DataFrame, idx: number): Promise<DataFrame>;
140+
122141
/**
123142
* Override to skip executing a query
124143
*

pkg/tsdb/azuremonitor/azure-log-analytics-datasource.go

+70-169
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,16 @@ import (
44
"bytes"
55
"compress/gzip"
66
"context"
7-
"encoding/base64"
87
"encoding/json"
98
"errors"
109
"fmt"
1110
"io/ioutil"
1211
"net/http"
1312
"net/url"
1413
"path"
15-
"time"
1614

15+
"github.com/grafana/grafana-plugin-sdk-go/data"
1716
"github.com/grafana/grafana/pkg/api/pluginproxy"
18-
"github.com/grafana/grafana/pkg/components/null"
1917
"github.com/grafana/grafana/pkg/components/simplejson"
2018
"github.com/grafana/grafana/pkg/models"
2119
"github.com/grafana/grafana/pkg/plugins"
@@ -58,11 +56,7 @@ func (e *AzureLogAnalyticsDatasource) executeTimeSeriesQuery(ctx context.Context
5856
}
5957

6058
for _, query := range queries {
61-
queryRes, err := e.executeQuery(ctx, query, originalQueries, timeRange)
62-
if err != nil {
63-
queryRes.Error = err
64-
}
65-
result.Results[query.RefID] = queryRes
59+
result.Results[query.RefID] = e.executeQuery(ctx, query, originalQueries, timeRange)
6660
}
6761

6862
return result, nil
@@ -115,13 +109,17 @@ func (e *AzureLogAnalyticsDatasource) buildQueries(queries []*tsdb.Query, timeRa
115109
return azureLogAnalyticsQueries, nil
116110
}
117111

118-
func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, query *AzureLogAnalyticsQuery, queries []*tsdb.Query, timeRange *tsdb.TimeRange) (*tsdb.QueryResult, error) {
112+
func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, query *AzureLogAnalyticsQuery, queries []*tsdb.Query, timeRange *tsdb.TimeRange) *tsdb.QueryResult {
119113
queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: query.RefID}
120114

115+
queryResultError := func(err error) *tsdb.QueryResult {
116+
queryResult.Error = err
117+
return queryResult
118+
}
119+
121120
req, err := e.createRequest(ctx, e.dsInfo)
122121
if err != nil {
123-
queryResult.Error = err
124-
return queryResult, nil
122+
return queryResultError(err)
125123
}
126124

127125
req.URL.Path = path.Join(req.URL.Path, query.URL)
@@ -140,38 +138,52 @@ func (e *AzureLogAnalyticsDatasource) executeQuery(ctx context.Context, query *A
140138
span.Context(),
141139
opentracing.HTTPHeaders,
142140
opentracing.HTTPHeadersCarrier(req.Header)); err != nil {
143-
queryResult.Error = err
144-
return queryResult, nil
141+
return queryResultError(err)
145142
}
146143

147144
azlog.Debug("AzureLogAnalytics", "Request ApiURL", req.URL.String())
148145
res, err := ctxhttp.Do(ctx, e.httpClient, req)
149146
if err != nil {
150-
queryResult.Error = err
151-
return queryResult, nil
147+
return queryResultError(err)
152148
}
153149

154-
data, err := e.unmarshalResponse(res)
150+
logResponse, err := e.unmarshalResponse(res)
155151
if err != nil {
156-
queryResult.Error = err
157-
return queryResult, nil
152+
return queryResultError(err)
158153
}
159154

160-
azlog.Debug("AzureLogsAnalytics", "Response", queryResult)
155+
t, err := logResponse.GetPrimaryResultTable()
156+
if err != nil {
157+
return queryResultError(err)
158+
}
161159

162-
if query.ResultFormat == "table" {
163-
queryResult.Tables, queryResult.Meta, err = e.parseToTables(data, query.Model, query.Params)
164-
if err != nil {
165-
return nil, err
166-
}
167-
} else {
168-
queryResult.Series, queryResult.Meta, err = e.parseToTimeSeries(data, query.Model, query.Params)
169-
if err != nil {
170-
return nil, err
171-
}
160+
frame, err := LogTableToFrame(t)
161+
if err != nil {
162+
return queryResultError(err)
172163
}
173164

174-
return queryResult, nil
165+
setAdditionalFrameMeta(frame,
166+
query.Params.Get("query"),
167+
query.Model.Get("subscriptionId").MustString(),
168+
query.Model.Get("azureLogAnalytics").Get("workspace").MustString())
169+
170+
if query.ResultFormat == "time_series" {
171+
tsSchema := frame.TimeSeriesSchema()
172+
if tsSchema.Type == data.TimeSeriesTypeLong {
173+
wideFrame, err := data.LongToWide(frame, &data.FillMissing{})
174+
if err == nil {
175+
frame = wideFrame
176+
} else {
177+
frame.AppendNotices(data.Notice{Severity: data.NoticeSeverityWarning, Text: "could not convert frame to time series, returning raw table: " + err.Error()})
178+
}
179+
}
180+
}
181+
frames := data.Frames{frame}
182+
queryResult.Dataframes, err = frames.MarshalArrow()
183+
if err != nil {
184+
return queryResultError(err)
185+
}
186+
return queryResult
175187
}
176188

177189
func (e *AzureLogAnalyticsDatasource) createRequest(ctx context.Context, dsInfo *models.DataSource) (*http.Request, error) {
@@ -225,6 +237,17 @@ func (e *AzureLogAnalyticsDatasource) getPluginRoute(plugin *plugins.DataSourceP
225237
return logAnalyticsRoute, pluginRouteName, nil
226238
}
227239

240+
// GetPrimaryResultTable returns the first table in the response named "PrimaryResult", or an
241+
// error if there is no table by that name.
242+
func (ar *AzureLogAnalyticsResponse) GetPrimaryResultTable() (*AzureLogAnalyticsTable, error) {
243+
for _, t := range ar.Tables {
244+
if t.Name == "PrimaryResult" {
245+
return &t, nil
246+
}
247+
}
248+
return nil, fmt.Errorf("no data as PrimaryResult table is missing from the the response")
249+
}
250+
228251
func (e *AzureLogAnalyticsDatasource) unmarshalResponse(res *http.Response) (AzureLogAnalyticsResponse, error) {
229252
body, err := ioutil.ReadAll(res.Body)
230253
defer res.Body.Close()
@@ -239,7 +262,9 @@ func (e *AzureLogAnalyticsDatasource) unmarshalResponse(res *http.Response) (Azu
239262
}
240263

241264
var data AzureLogAnalyticsResponse
242-
err = json.Unmarshal(body, &data)
265+
d := json.NewDecoder(bytes.NewReader(body))
266+
d.UseNumber()
267+
err = d.Decode(&data)
243268
if err != nil {
244269
azlog.Debug("Failed to unmarshal Azure Log Analytics response", "error", err, "status", res.Status, "body", string(body))
245270
return AzureLogAnalyticsResponse{}, err
@@ -248,153 +273,29 @@ func (e *AzureLogAnalyticsDatasource) unmarshalResponse(res *http.Response) (Azu
248273
return data, nil
249274
}
250275

251-
func (e *AzureLogAnalyticsDatasource) parseToTables(data AzureLogAnalyticsResponse, model *simplejson.Json, params url.Values) ([]*tsdb.Table, *simplejson.Json, error) {
252-
meta, err := createMetadata(model, params)
253-
if err != nil {
254-
return nil, simplejson.NewFromAny(meta), err
276+
func setAdditionalFrameMeta(frame *data.Frame, query, subscriptionID, workspace string) {
277+
frame.Meta.ExecutedQueryString = query
278+
frame.Meta.Custom["subscription"] = subscriptionID
279+
frame.Meta.Custom["workspace"] = workspace
280+
encodedQuery, err := encodeQuery(query)
281+
if err == nil {
282+
frame.Meta.Custom["encodedQuery"] = encodedQuery
283+
return
255284
}
256-
257-
tables := make([]*tsdb.Table, 0)
258-
for _, t := range data.Tables {
259-
if t.Name == "PrimaryResult" {
260-
table := tsdb.Table{
261-
Columns: make([]tsdb.TableColumn, 0),
262-
Rows: make([]tsdb.RowValues, 0),
263-
}
264-
265-
meta.Columns = make([]column, 0)
266-
for _, v := range t.Columns {
267-
meta.Columns = append(meta.Columns, column{Name: v.Name, Type: v.Type})
268-
table.Columns = append(table.Columns, tsdb.TableColumn{Text: v.Name})
269-
}
270-
271-
for _, r := range t.Rows {
272-
values := make([]interface{}, len(table.Columns))
273-
for i := 0; i < len(table.Columns); i++ {
274-
values[i] = r[i]
275-
}
276-
table.Rows = append(table.Rows, values)
277-
}
278-
tables = append(tables, &table)
279-
return tables, simplejson.NewFromAny(meta), nil
280-
}
281-
}
282-
283-
return nil, nil, errors.New("no data as no PrimaryResult table was returned in the response")
284-
}
285-
286-
func (e *AzureLogAnalyticsDatasource) parseToTimeSeries(data AzureLogAnalyticsResponse, model *simplejson.Json, params url.Values) (tsdb.TimeSeriesSlice, *simplejson.Json, error) {
287-
meta, err := createMetadata(model, params)
288-
if err != nil {
289-
return nil, simplejson.NewFromAny(meta), err
290-
}
291-
292-
for _, t := range data.Tables {
293-
if t.Name == "PrimaryResult" {
294-
timeIndex, metricIndex, valueIndex := -1, -1, -1
295-
meta.Columns = make([]column, 0)
296-
for i, v := range t.Columns {
297-
meta.Columns = append(meta.Columns, column{Name: v.Name, Type: v.Type})
298-
299-
if timeIndex == -1 && v.Type == "datetime" {
300-
timeIndex = i
301-
}
302-
303-
if metricIndex == -1 && v.Type == "string" {
304-
metricIndex = i
305-
}
306-
307-
if valueIndex == -1 && (v.Type == "int" || v.Type == "long" || v.Type == "real" || v.Type == "double") {
308-
valueIndex = i
309-
}
310-
}
311-
312-
if timeIndex == -1 {
313-
azlog.Info("No time column specified. Returning existing columns, no data")
314-
return nil, simplejson.NewFromAny(meta), nil
315-
}
316-
317-
if valueIndex == -1 {
318-
azlog.Info("No value column specified. Returning existing columns, no data")
319-
return nil, simplejson.NewFromAny(meta), nil
320-
}
321-
322-
slice := tsdb.TimeSeriesSlice{}
323-
buckets := map[string]*tsdb.TimeSeriesPoints{}
324-
325-
getSeriesBucket := func(metricName string) *tsdb.TimeSeriesPoints {
326-
if points, ok := buckets[metricName]; ok {
327-
return points
328-
}
329-
330-
series := tsdb.NewTimeSeries(metricName, []tsdb.TimePoint{})
331-
slice = append(slice, series)
332-
buckets[metricName] = &series.Points
333-
334-
return &series.Points
335-
}
336-
337-
for _, r := range t.Rows {
338-
timeStr, ok := r[timeIndex].(string)
339-
if !ok {
340-
return nil, simplejson.NewFromAny(meta), errors.New("invalid time value")
341-
}
342-
timeValue, err := time.Parse(time.RFC3339Nano, timeStr)
343-
if err != nil {
344-
return nil, simplejson.NewFromAny(meta), err
345-
}
346-
347-
var value float64
348-
if value, err = getFloat(r[valueIndex]); err != nil {
349-
return nil, simplejson.NewFromAny(meta), err
350-
}
351-
352-
var metricName string
353-
if metricIndex == -1 {
354-
metricName = t.Columns[valueIndex].Name
355-
} else {
356-
metricName, ok = r[metricIndex].(string)
357-
if !ok {
358-
return nil, simplejson.NewFromAny(meta), err
359-
}
360-
}
361-
362-
points := getSeriesBucket(metricName)
363-
*points = append(*points, tsdb.NewTimePoint(null.FloatFrom(value), float64(timeValue.Unix()*1000)))
364-
}
365-
366-
return slice, simplejson.NewFromAny(meta), nil
367-
}
368-
}
369-
370-
return nil, nil, errors.New("no data as no PrimaryResult table was returned in the response")
285+
azlog.Error("failed to encode the query into the encodedQuery property")
371286
}
372287

373-
func createMetadata(model *simplejson.Json, params url.Values) (metadata, error) {
374-
meta := metadata{
375-
Query: params.Get("query"),
376-
Subscription: model.Get("subscriptionId").MustString(),
377-
Workspace: model.Get("azureLogAnalytics").Get("workspace").MustString(),
378-
}
379-
380-
encQuery, err := encodeQuery(meta.Query)
381-
if err != nil {
382-
return meta, err
383-
}
384-
meta.EncodedQuery = encQuery
385-
return meta, nil
386-
}
387-
388-
func encodeQuery(rawQuery string) (string, error) {
288+
// encodeQuery encodes the query in gzip so the frontend can build links.
289+
func encodeQuery(rawQuery string) ([]byte, error) {
389290
var b bytes.Buffer
390291
gz := gzip.NewWriter(&b)
391292
if _, err := gz.Write([]byte(rawQuery)); err != nil {
392-
return "", err
293+
return nil, err
393294
}
394295

395296
if err := gz.Close(); err != nil {
396-
return "", err
297+
return nil, err
397298
}
398299

399-
return base64.StdEncoding.EncodeToString(b.Bytes()), nil
300+
return b.Bytes(), nil
400301
}

0 commit comments

Comments
 (0)