summaryrefslogtreecommitdiff
path: root/src/backend/commands/explain_dr.c
blob: 5715546cf437be1490d884ed79cbb52ff50d2c3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*-------------------------------------------------------------------------
 *
 * explain_dr.c
 *	  Explain DestReceiver to measure serialization overhead
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994-5, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/commands/explain.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "commands/explain.h"
#include "commands/explain_dr.h"
#include "commands/explain_state.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "utils/lsyscache.h"

/*
 * DestReceiver functions for SERIALIZE option
 *
 * A DestReceiver for query tuples, that serializes passed rows into RowData
 * messages while measuring the resources expended and total serialized size,
 * while never sending the data to the client.  This allows measuring the
 * overhead of deTOASTing and datatype out/sendfuncs, which are not otherwise
 * exercisable without actually hitting the network.
 */
typedef struct SerializeDestReceiver
{
	DestReceiver pub;
	ExplainState *es;			/* this EXPLAIN statement's ExplainState */
	int8		format;			/* text or binary, like pq wire protocol */
	TupleDesc	attrinfo;		/* the output tuple desc */
	int			nattrs;			/* current number of columns */
	FmgrInfo   *finfos;			/* precomputed call info for output fns */
	MemoryContext tmpcontext;	/* per-row temporary memory context */
	StringInfoData buf;			/* buffer to hold the constructed message */
	SerializeMetrics metrics;	/* collected metrics */
} SerializeDestReceiver;

/*
 * Get the function lookup info that we'll need for output.
 *
 * This is a subset of what printtup_prepare_info() does.  We don't need to
 * cope with format choices varying across columns, so it's slightly simpler.
 */
static void
serialize_prepare_info(SerializeDestReceiver *receiver,
					   TupleDesc typeinfo, int nattrs)
{
	/* get rid of any old data */
	if (receiver->finfos)
		pfree(receiver->finfos);
	receiver->finfos = NULL;

	receiver->attrinfo = typeinfo;
	receiver->nattrs = nattrs;
	if (nattrs <= 0)
		return;

	receiver->finfos = (FmgrInfo *) palloc0(nattrs * sizeof(FmgrInfo));

	for (int i = 0; i < nattrs; i++)
	{
		FmgrInfo   *finfo = receiver->finfos + i;
		Form_pg_attribute attr = TupleDescAttr(typeinfo, i);
		Oid			typoutput;
		Oid			typsend;
		bool		typisvarlena;

		if (receiver->format == 0)
		{
			/* wire protocol format text */
			getTypeOutputInfo(attr->atttypid,
							  &typoutput,
							  &typisvarlena);
			fmgr_info(typoutput, finfo);
		}
		else if (receiver->format == 1)
		{
			/* wire protocol format binary */
			getTypeBinaryOutputInfo(attr->atttypid,
									&typsend,
									&typisvarlena);
			fmgr_info(typsend, finfo);
		}
		else
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
					 errmsg("unsupported format code: %d", receiver->format)));
	}
}

/*
 * serializeAnalyzeReceive - collect tuples for EXPLAIN (SERIALIZE)
 *
 * This should match printtup() in printtup.c as closely as possible,
 * except for the addition of measurement code.
 */
static bool
serializeAnalyzeReceive(TupleTableSlot *slot, DestReceiver *self)
{
	TupleDesc	typeinfo = slot->tts_tupleDescriptor;
	SerializeDestReceiver *myState = (SerializeDestReceiver *) self;
	MemoryContext oldcontext;
	StringInfo	buf = &myState->buf;
	int			natts = typeinfo->natts;
	instr_time	start,
				end;
	BufferUsage instr_start;

	/* only measure time, buffers if requested */
	if (myState->es->timing)
		INSTR_TIME_SET_CURRENT(start);
	if (myState->es->buffers)
		instr_start = pgBufferUsage;

	/* Set or update my derived attribute info, if needed */
	if (myState->attrinfo != typeinfo || myState->nattrs != natts)
		serialize_prepare_info(myState, typeinfo, natts);

	/* Make sure the tuple is fully deconstructed */
	slot_getallattrs(slot);

	/* Switch into per-row context so we can recover memory below */
	oldcontext = MemoryContextSwitchTo(myState->tmpcontext);

	/*
	 * Prepare a DataRow message (note buffer is in per-query context)
	 *
	 * Note that we fill a StringInfo buffer the same as printtup() does, so
	 * as to capture the costs of manipulating the strings accurately.
	 */
	pq_beginmessage_reuse(buf, PqMsg_DataRow);

	pq_sendint16(buf, natts);

	/*
	 * send the attributes of this tuple
	 */
	for (int i = 0; i < natts; i++)
	{
		FmgrInfo   *finfo = myState->finfos + i;
		Datum		attr = slot->tts_values[i];

		if (slot->tts_isnull[i])
		{
			pq_sendint32(buf, -1);
			continue;
		}

		if (myState->format == 0)
		{
			/* Text output */
			char	   *outputstr;

			outputstr = OutputFunctionCall(finfo, attr);
			pq_sendcountedtext(buf, outputstr, strlen(outputstr));
		}
		else
		{
			/* Binary output */
			bytea	   *outputbytes;

			outputbytes = SendFunctionCall(finfo, attr);
			pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
			pq_sendbytes(buf, VARDATA(outputbytes),
						 VARSIZE(outputbytes) - VARHDRSZ);
		}
	}

	/*
	 * We mustn't call pq_endmessage_reuse(), since that would actually send
	 * the data to the client.  Just count the data, instead.  We can leave
	 * the buffer alone; it'll be reset on the next iteration (as would also
	 * happen in printtup()).
	 */
	myState->metrics.bytesSent += buf->len;

	/* Return to caller's context, and flush row's temporary memory */
	MemoryContextSwitchTo(oldcontext);
	MemoryContextReset(myState->tmpcontext);

	/* Update timing data */
	if (myState->es->timing)
	{
		INSTR_TIME_SET_CURRENT(end);
		INSTR_TIME_ACCUM_DIFF(myState->metrics.timeSpent, end, start);
	}

	/* Update buffer metrics */
	if (myState->es->buffers)
		BufferUsageAccumDiff(&myState->metrics.bufferUsage,
							 &pgBufferUsage,
							 &instr_start);

	return true;
}

/*
 * serializeAnalyzeStartup - start up the serializeAnalyze receiver
 */
static void
serializeAnalyzeStartup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
	SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;

	Assert(receiver->es != NULL);

	switch (receiver->es->serialize)
	{
		case EXPLAIN_SERIALIZE_NONE:
			Assert(false);
			break;
		case EXPLAIN_SERIALIZE_TEXT:
			receiver->format = 0;	/* wire protocol format text */
			break;
		case EXPLAIN_SERIALIZE_BINARY:
			receiver->format = 1;	/* wire protocol format binary */
			break;
	}

	/* Create per-row temporary memory context */
	receiver->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
												 "SerializeTupleReceive",
												 ALLOCSET_DEFAULT_SIZES);

	/* The output buffer is re-used across rows, as in printtup.c */
	initStringInfo(&receiver->buf);

	/* Initialize results counters */
	memset(&receiver->metrics, 0, sizeof(SerializeMetrics));
	INSTR_TIME_SET_ZERO(receiver->metrics.timeSpent);
}

/*
 * serializeAnalyzeShutdown - shut down the serializeAnalyze receiver
 */
static void
serializeAnalyzeShutdown(DestReceiver *self)
{
	SerializeDestReceiver *receiver = (SerializeDestReceiver *) self;

	if (receiver->finfos)
		pfree(receiver->finfos);
	receiver->finfos = NULL;

	if (receiver->buf.data)
		pfree(receiver->buf.data);
	receiver->buf.data = NULL;

	if (receiver->tmpcontext)
		MemoryContextDelete(receiver->tmpcontext);
	receiver->tmpcontext = NULL;
}

/*
 * serializeAnalyzeDestroy - destroy the serializeAnalyze receiver
 */
static void
serializeAnalyzeDestroy(DestReceiver *self)
{
	pfree(self);
}

/*
 * Build a DestReceiver for EXPLAIN (SERIALIZE) instrumentation.
 */
DestReceiver *
CreateExplainSerializeDestReceiver(ExplainState *es)
{
	SerializeDestReceiver *self;

	self = (SerializeDestReceiver *) palloc0(sizeof(SerializeDestReceiver));

	self->pub.receiveSlot = serializeAnalyzeReceive;
	self->pub.rStartup = serializeAnalyzeStartup;
	self->pub.rShutdown = serializeAnalyzeShutdown;
	self->pub.rDestroy = serializeAnalyzeDestroy;
	self->pub.mydest = DestExplainSerialize;

	self->es = es;

	return (DestReceiver *) self;
}

/*
 * GetSerializationMetrics - collect metrics
 *
 * We have to be careful here since the receiver could be an IntoRel
 * receiver if the subject statement is CREATE TABLE AS.  In that
 * case, return all-zeroes stats.
 */
SerializeMetrics
GetSerializationMetrics(DestReceiver *dest)
{
	SerializeMetrics empty;

	if (dest->mydest == DestExplainSerialize)
		return ((SerializeDestReceiver *) dest)->metrics;

	memset(&empty, 0, sizeof(SerializeMetrics));
	INSTR_TIME_SET_ZERO(empty.timeSpent);

	return empty;
}