Skip to content

Commit 671128a

Browse files
committed
Addressed comments: Revived the getMissingValueInterpretationMap method and added Integration Test.
1 parent 547c4ff commit 671128a

File tree

5 files changed

+141
-10
lines changed

5 files changed

+141
-10
lines changed

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,21 +197,11 @@
197197
<differenceType>1001</differenceType>
198198
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
199199
</difference>
200-
<difference>
201-
<differenceType>7002</differenceType>
202-
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
203-
<method>java.util.Map getMissingValueInterpretationMap()</method>
204-
</difference>
205200
<difference>
206201
<differenceType>7002</differenceType>
207202
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
208203
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
209204
</difference>
210-
<difference>
211-
<differenceType>7002</differenceType>
212-
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>
213-
<method>java.util.Map getMissingValueInterpretationMap()</method>
214-
</difference>
215205
<difference>
216206
<differenceType>7002</differenceType>
217207
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ public long getInflightWaitSeconds() {
119119
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
120120
}
121121

122+
/** @return the missing value interpretation map used for the writer. */
123+
public Map<String, AppendRowsRequest.MissingValueInterpretation>
124+
getMissingValueInterpretationMap() {
125+
return this.schemaAwareStreamWriter.getMissingValueInterpretationMap();
126+
}
127+
122128
/**
123129
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
124130
* StreamWriter by default.

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,12 @@ public long getInflightWaitSeconds() {
299299
return streamWriter.getInflightWaitSeconds();
300300
}
301301

302+
/** @return the missing value interpretation map used for the writer. */
303+
public Map<String, AppendRowsRequest.MissingValueInterpretation>
304+
getMissingValueInterpretationMap() {
305+
return streamWriter.getMissingValueInterpretationMap();
306+
}
307+
302308
/** Sets all StreamWriter settings. */
303309
private void setStreamWriterSettings(
304310
@Nullable TransportChannelProvider channelProvider,

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
893893
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
894894
.build());
895895
testBigQueryWrite.addResponse(createAppendResponse(1));
896+
// Verify the map before the writer is refreshed
897+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
896898
testBigQueryWrite.addResponse(createAppendResponse(2));
897899
testBigQueryWrite.addResponse(createAppendResponse(3));
898900

@@ -944,6 +946,8 @@ public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
944946
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
945947
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
946948

949+
// Verify the map after the writer is refreshed
950+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
947951
assertEquals(
948952
testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
949953
MissingValueInterpretation.DEFAULT_VALUE);
@@ -1618,6 +1622,8 @@ public void testAppendWithMissingValueMap() throws Exception {
16181622
.setTraceId("test:empty")
16191623
.build()) {
16201624

1625+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
1626+
16211627
testBigQueryWrite.addResponse(
16221628
AppendRowsResponse.newBuilder()
16231629
.setAppendResult(

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,129 @@ public void testJsonStreamWriterSchemaUpdateConcurrent()
13061306
}
13071307
}
13081308

1309+
@Test
1310+
public void testJsonStreamWriterSchemaUpdateWithMissingValueInterpretationMap()
1311+
throws DescriptorValidationException, ExecutionException, IOException, InterruptedException,
1312+
ParseException {
1313+
String tableName = "SchemaUpdateMissingValueMapTestTable";
1314+
TableId tableId = TableId.of(DATASET, tableName);
1315+
tableInfo = TableInfo.newBuilder(tableId, defaultValueTableDefinition).build();
1316+
bigquery.create(tableInfo);
1317+
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
1318+
WriteStream writeStream =
1319+
client.createWriteStream(
1320+
CreateWriteStreamRequest.newBuilder()
1321+
.setParent(parent.toString())
1322+
.setWriteStream(
1323+
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
1324+
.build());
1325+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
1326+
missingValueMap.put(
1327+
"foo_with_default", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1328+
missingValueMap.put(
1329+
"date_with_default_to_current", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1330+
1331+
try (JsonStreamWriter jsonStreamWriter =
1332+
JsonStreamWriter.newBuilder(writeStream.getName(), client)
1333+
.setMissingValueInterpretationMap(missingValueMap)
1334+
.build()) {
1335+
// Verify the missing value map
1336+
assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
1337+
1338+
// First append with the current schema
1339+
JSONObject jsonObject = new JSONObject();
1340+
jsonObject.put("bar_without_default", "existing_col_before_update");
1341+
JSONArray jsonArr = new JSONArray();
1342+
jsonArr.put(jsonObject);
1343+
ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr, 0);
1344+
assertEquals(0, response1.get().getAppendResult().getOffset().getValue());
1345+
1346+
// Add a column to the table
1347+
Field newCol =
1348+
Field.newBuilder("new_col_without_default", StandardSQLTypeName.STRING)
1349+
.setMode(Field.Mode.NULLABLE)
1350+
.build();
1351+
ArrayList<Field> updatedFields =
1352+
new ArrayList<>(defaultValueTableDefinition.getSchema().getFields());
1353+
updatedFields.add(newCol);
1354+
Schema updatedSchema = Schema.of(updatedFields);
1355+
TableInfo updatedTableInfo =
1356+
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
1357+
Table updatedTable = bigquery.update(updatedTableInfo);
1358+
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());
1359+
1360+
// Continue writing rows until backend acknowledges schema update
1361+
JSONObject jsonObject2 = new JSONObject();
1362+
jsonObject2.put("bar_without_default", "no_schema_update_yet");
1363+
JSONArray jsonArr2 = new JSONArray();
1364+
jsonArr2.put(jsonObject2);
1365+
1366+
int nextI = 0;
1367+
for (int i = 1; i < 100; i++) {
1368+
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
1369+
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
1370+
if (response2.get().hasUpdatedSchema()) {
1371+
nextI = i + 1;
1372+
break;
1373+
} else {
1374+
Thread.sleep(1000);
1375+
}
1376+
}
1377+
1378+
// Write using the new schema with 10 new requests
1379+
JSONObject updatedCol = new JSONObject();
1380+
updatedCol.put("bar_without_default", "existing_col");
1381+
updatedCol.put("new_col_without_default", "new_col");
1382+
JSONArray updatedJsonArr = new JSONArray();
1383+
updatedJsonArr.put(updatedCol);
1384+
for (int i = nextI; i < nextI + 10; i++) {
1385+
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(updatedJsonArr, i);
1386+
assertEquals(i, response3.get().getAppendResult().getOffset().getValue());
1387+
}
1388+
1389+
// List all rows to verify table data correctness
1390+
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();
1391+
1392+
// Verify 1st row (with "existing_col_before_update")
1393+
FieldValueList currentRow = rowsIter.next();
1394+
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
1395+
assertEquals("existing_col_before_update", currentRow.get(1).getStringValue());
1396+
assertFalse(currentRow.get(2).getStringValue().isEmpty());
1397+
// Check whether the recorded value is close enough.
1398+
Instant parsedInstant =
1399+
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
1400+
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
1401+
1402+
// A few rows (with "no_schema_update_yet") until the schema was updated
1403+
for (int j = 1; j < nextI; j++) {
1404+
currentRow = rowsIter.next();
1405+
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
1406+
assertEquals("no_schema_update_yet", currentRow.get(1).getStringValue());
1407+
// Check whether the recorded value is close enough.
1408+
parsedInstant =
1409+
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
1410+
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
1411+
}
1412+
// 10 rows after schema update with new column included
1413+
for (int j = nextI; j < nextI + 10; j++) {
1414+
currentRow = rowsIter.next();
1415+
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
1416+
assertEquals("existing_col", currentRow.get(1).getStringValue());
1417+
assertFalse(currentRow.get(2).getStringValue().isEmpty());
1418+
// Check whether the recorded value is close enough.
1419+
parsedInstant =
1420+
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
1421+
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));
1422+
// Verify the new column
1423+
assertEquals("new_col", currentRow.get(3).getStringValue());
1424+
}
1425+
assertFalse(rowsIter.hasNext());
1426+
1427+
// Verify that the missing value map hasn't changed
1428+
assertEquals(missingValueMap, jsonStreamWriter.getMissingValueInterpretationMap());
1429+
}
1430+
}
1431+
13091432
@Test
13101433
public void testJsonStreamWriterWithFlexibleColumnName()
13111434
throws IOException, InterruptedException, ExecutionException,

0 commit comments

Comments
 (0)