Skip to content

Commit 391ddc3

Browse files
committed
chore(spanner): executor framework instance admin actions implementation
1 parent a9f6c3f commit 391ddc3

3 files changed

Lines changed: 202 additions & 28 deletions

File tree

handwritten/spanner/google-cloud-spanner-executor/src/cloud-client-executor.ts

Lines changed: 197 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,27 @@
1414
* limitations under the License.
1515
*/
1616

17-
import {ServerDuplexStream, status} from '@grpc/grpc-js';
18-
import {Spanner} from '../../src';
19-
import {trace, context, Tracer} from '@opentelemetry/api';
17+
import { ServerDuplexStream, status } from '@grpc/grpc-js';
18+
import { Spanner } from '../../src';
19+
import { trace, context, Tracer } from '@opentelemetry/api';
2020
import * as protos from '../../protos/protos';
21-
import {CloudUtil} from './cloud-util';
22-
import {OutcomeSender, ExecutionFlowContextInterface} from './cloud-executor';
21+
import { CloudUtil } from './cloud-util';
22+
import {
23+
OutcomeSender,
24+
ExecutionFlowContextInterface,
25+
CloudExecutor,
26+
} from './cloud-executor';
2327
import spanner = protos.google.spanner;
2428
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
2529
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
30+
import SpannerActionOutcome = spanner.executor.v1.SpannerActionOutcome;
2631
import ISpannerAction = spanner.executor.v1.ISpannerAction;
2732
import IAdminAction = spanner.executor.v1.IAdminAction;
2833
import ICreateCloudInstanceAction = spanner.executor.v1.ICreateCloudInstanceAction;
34+
import IUpdateCloudInstanceAction = spanner.executor.v1.IUpdateCloudInstanceAction;
35+
import IDeleteCloudInstanceAction = spanner.executor.v1.IDeleteCloudInstanceAction;
36+
import IListCloudInstancesAction = spanner.executor.v1.IListCloudInstancesAction;
37+
import IGetCloudInstanceAction = spanner.executor.v1.IGetCloudInstanceAction;
2938

3039
/**
3140
* Context for a single stream connection.
@@ -66,9 +75,11 @@ export class ExecutionFlowContext implements ExecutionFlowContextInterface {
6675
* Sends an error back to the client.
6776
*/
6877
public onError(error: Error): void {
69-
const stream = this.call as any;
70-
71-
if (this.call.cancelled || stream.destroyed || stream.writable === false) {
78+
if (
79+
this.call.cancelled ||
80+
this.call.destroyed ||
81+
this.call.writable === false
82+
) {
7283
console.warn(
7384
'Attempted to emit error to a closed or cancelled stream.',
7485
error,
@@ -99,6 +110,23 @@ export class CloudClientExecutor {
99110
action as ICreateCloudInstanceAction,
100111
sender,
101112
),
113+
updateCloudInstance: (action, sender) =>
114+
this.executeUpdateCloudInstance(
115+
action as IUpdateCloudInstanceAction,
116+
sender,
117+
),
118+
deleteCloudInstance: (action, sender) =>
119+
this.executeDeleteCloudInstance(
120+
action as IDeleteCloudInstanceAction,
121+
sender,
122+
),
123+
listCloudInstances: (action, sender) =>
124+
this.executeListCloudInstances(
125+
action as IListCloudInstancesAction,
126+
sender,
127+
),
128+
getCloudInstance: (action, sender) =>
129+
this.executeGetCloudInstance(action as IGetCloudInstanceAction, sender),
102130
};
103131

104132
private readonly actionRegistry: Record<string, ActionHandler> = {
@@ -130,7 +158,7 @@ export class CloudClientExecutor {
130158
public startHandlingRequest(
131159
req: SpannerAsyncActionRequest,
132160
executionContext: ExecutionFlowContext,
133-
): {code: number; details: string} {
161+
): { code: number; details: string } {
134162
const outcomeSender = new OutcomeSender(req.actionId!, executionContext);
135163

136164
if (!req.action) {
@@ -144,7 +172,7 @@ export class CloudClientExecutor {
144172
outcomeSender.finishWithError(err);
145173
});
146174

147-
return {code: status.OK, details: ''};
175+
return { code: status.OK, details: '' };
148176
}
149177

150178
/**
@@ -155,10 +183,8 @@ export class CloudClientExecutor {
155183
action: ISpannerAction,
156184
): Promise<void> {
157185
const actionType =
158-
Object.keys(action).find(
159-
k =>
160-
action[k as keyof typeof action] !== undefined &&
161-
!!this.actionRegistry[k],
186+
Object.keys(this.actionRegistry).find(
187+
k => action[k as keyof typeof action] !== undefined,
162188
) || 'unknown';
163189
const span = this.tracer.startSpan(`performaction_${actionType}`);
164190

@@ -195,10 +221,8 @@ export class CloudClientExecutor {
195221
sender: OutcomeSender,
196222
): Promise<void> {
197223
try {
198-
const adminType = Object.keys(action).find(
199-
k =>
200-
action[k as keyof typeof action] !== undefined &&
201-
!!this.adminActionRegistry[k],
224+
const adminType = Object.keys(this.adminActionRegistry).find(
225+
k => action[k as keyof typeof action] !== undefined,
202226
);
203227

204228
if (adminType && this.adminActionRegistry[adminType]) {
@@ -226,21 +250,28 @@ export class CloudClientExecutor {
226250
console.log(`Creating instance: \n${JSON.stringify(action, null, 2)}`);
227251

228252
const instanceId = action.instanceId!;
229-
const projectId = action.projectId!;
253+
const projectId = action.projectId || CloudExecutor.PROJECT_ID;
230254
const configId = action.instanceConfigId!;
231255

232256
const instanceAdminClient = this.spanner.getInstanceAdminClient();
233257

258+
const instancePayload: any = {
259+
config: instanceAdminClient.instanceConfigPath(projectId, configId),
260+
displayName: instanceId,
261+
labels: action.labels || {},
262+
};
263+
264+
if (action.nodeCount !== undefined) {
265+
instancePayload.nodeCount = action.nodeCount;
266+
}
267+
if (action.processingUnits !== undefined) {
268+
instancePayload.processingUnits = action.processingUnits;
269+
}
270+
234271
const [operation] = await instanceAdminClient.createInstance({
235272
parent: instanceAdminClient.projectPath(projectId),
236273
instanceId: instanceId,
237-
instance: {
238-
config: instanceAdminClient.instanceConfigPath(projectId, configId),
239-
displayName: instanceId,
240-
nodeCount: action.nodeCount || 1,
241-
processingUnits: action.processingUnits,
242-
labels: action.labels || {},
243-
},
274+
instance: instancePayload,
244275
});
245276

246277
console.log('Waiting for instance creation operation to complete...');
@@ -259,4 +290,144 @@ export class CloudClientExecutor {
259290
sender.finishWithError(err);
260291
}
261292
}
293+
294+
private async executeUpdateCloudInstance(
295+
action: IUpdateCloudInstanceAction,
296+
sender: OutcomeSender,
297+
): Promise<void> {
298+
try {
299+
console.log(`Updating instance: \n${JSON.stringify(action, null, 2)}`);
300+
301+
const instanceId = action.instanceId!;
302+
const projectId = action.projectId || CloudExecutor.PROJECT_ID;
303+
304+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
305+
306+
const paths: string[] = [];
307+
if (action.displayName !== undefined) paths.push('display_name');
308+
if (action.nodeCount !== undefined) paths.push('node_count');
309+
if (action.processingUnits !== undefined) paths.push('processing_units');
310+
if (action.labels && Object.keys(action.labels).length > 0)
311+
paths.push('labels');
312+
313+
const [operation] = await instanceAdminClient.updateInstance({
314+
instance: {
315+
name: instanceAdminClient.instancePath(projectId, instanceId),
316+
displayName: action.displayName,
317+
nodeCount: action.nodeCount,
318+
processingUnits: action.processingUnits,
319+
labels: action.labels,
320+
},
321+
fieldMask: { paths: paths },
322+
});
323+
324+
console.log('Waiting for instance update operation to complete...');
325+
await operation.promise();
326+
327+
console.log(`Instance ${instanceId} updated successfully.`);
328+
329+
sender.finishWithOK();
330+
} catch (err: any) {
331+
console.error('Failed to update instance:', err);
332+
sender.finishWithError(err);
333+
}
334+
}
335+
336+
private async executeDeleteCloudInstance(
337+
action: IDeleteCloudInstanceAction,
338+
sender: OutcomeSender,
339+
): Promise<void> {
340+
try {
341+
console.log(`Deleting instance: \n${JSON.stringify(action, null, 2)}`);
342+
343+
const instanceId = action.instanceId!;
344+
const projectId = action.projectId || CloudExecutor.PROJECT_ID;
345+
346+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
347+
348+
await instanceAdminClient.deleteInstance({
349+
name: instanceAdminClient.instancePath(projectId, instanceId),
350+
});
351+
352+
console.log(`Instance ${instanceId} deleted successfully.`);
353+
354+
sender.finishWithOK();
355+
} catch (err: any) {
356+
console.error('Failed to delete instance:', err);
357+
sender.finishWithError(err);
358+
}
359+
}
360+
361+
private async executeListCloudInstances(
362+
action: IListCloudInstancesAction,
363+
sender: OutcomeSender,
364+
): Promise<void> {
365+
try {
366+
console.log(`Listing instances: \n${JSON.stringify(action, null, 2)}`);
367+
368+
const projectId = action.projectId || CloudExecutor.PROJECT_ID;
369+
370+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
371+
372+
const [instances, , response] = await instanceAdminClient.listInstances({
373+
parent: instanceAdminClient.projectPath(projectId),
374+
filter: action.filter,
375+
pageSize: action.pageSize,
376+
pageToken: action.pageToken,
377+
});
378+
379+
console.log(`Found ${instances.length} instances.`);
380+
381+
const outcome = SpannerActionOutcome.create({
382+
status: CloudExecutor.toProto(status.OK),
383+
commitTime: { seconds: 0, nanos: 0 },
384+
adminResult: {
385+
instanceResponse: {
386+
listedInstances: instances,
387+
nextPageToken: response?.nextPageToken || '',
388+
},
389+
},
390+
});
391+
392+
sender.sendOutcome(outcome);
393+
} catch (err: any) {
394+
console.error('Failed to list instances:', err);
395+
sender.finishWithError(err);
396+
}
397+
}
398+
399+
private async executeGetCloudInstance(
400+
action: IGetCloudInstanceAction,
401+
sender: OutcomeSender,
402+
): Promise<void> {
403+
try {
404+
console.log(`Getting instance: \n${JSON.stringify(action, null, 2)}`);
405+
406+
const instanceId = action.instanceId!;
407+
const projectId = action.projectId || CloudExecutor.PROJECT_ID;
408+
409+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
410+
411+
const [instance] = await instanceAdminClient.getInstance({
412+
name: instanceAdminClient.instancePath(projectId, instanceId),
413+
});
414+
415+
console.log(`Found instance: ${instance.name}`);
416+
417+
const outcome = SpannerActionOutcome.create({
418+
status: CloudExecutor.toProto(status.OK),
419+
commitTime: { seconds: 0, nanos: 0 },
420+
adminResult: {
421+
instanceResponse: {
422+
instance: instance,
423+
},
424+
},
425+
});
426+
427+
sender.sendOutcome(outcome);
428+
} catch (err: any) {
429+
console.error('Failed to get instance:', err);
430+
sender.finishWithError(err);
431+
}
432+
}
262433
}

handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor-impl.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import {ServerDuplexStream, status} from '@grpc/grpc-js';
18-
import {trace, context, Tracer} from '@opentelemetry/api';
18+
import {trace, context, Tracer, SpanStatusCode} from '@opentelemetry/api';
1919
import {CloudClientExecutor} from './cloud-client-executor';
2020
import * as protos from '../../protos/protos';
2121
import spanner = protos.google.spanner;
@@ -86,6 +86,7 @@ export class CloudExecutorImpl {
8686
context.with(streamContext, () => {
8787
console.error('Client ends the stream with error.', err);
8888
span.recordException(err);
89+
span.setStatus({code: SpanStatusCode.ERROR, message: err.message});
8990
span.end();
9091
executionContext.cleanup();
9192
});

handwritten/spanner/google-cloud-spanner-executor/src/cloud-executor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export class OutcomeSender {
4646
public finishWithOK(): {code: number; details: string} {
4747
const outcome = SpannerActionOutcome.create({
4848
status: CloudExecutor.toProto(status.OK),
49+
commitTime: {seconds: 0, nanos: 0},
4950
});
5051
return this.sendOutcome(outcome);
5152
}
@@ -54,11 +55,12 @@ export class OutcomeSender {
5455
const s = CloudExecutor.toStatus(err);
5556
const outcome = SpannerActionOutcome.create({
5657
status: CloudExecutor.toProto(s.code, s.message),
58+
commitTime: {seconds: 0, nanos: 0},
5759
});
5860
return this.sendOutcome(outcome);
5961
}
6062

61-
private sendOutcome(outcome: SpannerActionOutcome): {
63+
public sendOutcome(outcome: SpannerActionOutcome): {
6264
code: number;
6365
details: string;
6466
} {

0 commit comments

Comments
 (0)