@@ -93,6 +93,26 @@ public class ConnectionImplTest {
9393 .setTotalRows (BigInteger .valueOf (1L ))
9494 .setSchema (FAST_QUERY_TABLESCHEMA );
9595
96+ private static final GetQueryResultsResponse GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA =
97+ new GetQueryResultsResponse ()
98+ .setJobReference (QUERY_JOB .toPb ())
99+ .setRows (ImmutableList .of (TABLE_ROW ))
100+ .setJobComplete (false )
101+ .setPageToken (PAGE_TOKEN )
102+ .setTotalBytesProcessed (42L )
103+ .setTotalRows (BigInteger .valueOf (1L ))
104+ .setSchema (null );
105+
106+ private static List <TableRow > TABLE_ROWS =
107+ ImmutableList .of (
108+ new TableRow ()
109+ .setF (
110+ ImmutableList .of (new TableCell ().setV ("Value1" ), new TableCell ().setV ("Value2" ))),
111+ new TableRow ()
112+ .setF (
113+ ImmutableList .of (
114+ new TableCell ().setV ("Value3" ), new TableCell ().setV ("Value4" ))));
115+
96116 private BigQueryOptions createBigQueryOptionsForProject (
97117 String project , BigQueryRpcFactory rpcFactory ) {
98118 return BigQueryOptions .newBuilder ()
@@ -211,24 +231,13 @@ public void testQueryDryRun() throws BigQuerySQLException {
211231
212232 @ Test
213233 public void testParseDataTask () throws InterruptedException {
214- List <TableRow > tableRows =
215- ImmutableList .of (
216- new TableRow ()
217- .setF (
218- ImmutableList .of (
219- new TableCell ().setV ("Value1" ), new TableCell ().setV ("Value2" ))),
220- new TableRow ()
221- .setF (
222- ImmutableList .of (
223- new TableCell ().setV ("Value3" ), new TableCell ().setV ("Value4" ))));
224-
225234 BlockingQueue <Tuple <Iterable <FieldValueList >, Boolean >> pageCache =
226235 new LinkedBlockingDeque <>(2 );
227236 BlockingQueue <Tuple <TableDataList , Boolean >> rpcResponseQueue = new LinkedBlockingDeque <>(2 );
228237 rpcResponseQueue .offer (Tuple .of (null , false ));
229238 // This call should populate page cache
230239 ConnectionImpl connectionSpy = Mockito .spy (connection );
231- connectionSpy .parseRpcDataAsync (tableRows , QUERY_SCHEMA , pageCache , rpcResponseQueue );
240+ connectionSpy .parseRpcDataAsync (TABLE_ROWS , QUERY_SCHEMA , pageCache , rpcResponseQueue );
232241 Tuple <Iterable <FieldValueList >, Boolean > fvlTupple =
233242 pageCache .take (); // wait for the parser thread to parse the data
234243 assertNotNull (fvlTupple );
@@ -247,16 +256,6 @@ public void testParseDataTask() throws InterruptedException {
247256
248257 @ Test
249258 public void testPopulateBuffer () throws InterruptedException {
250- List <TableRow > tableRows =
251- ImmutableList .of (
252- new TableRow ()
253- .setF (
254- ImmutableList .of (
255- new TableCell ().setV ("Value1" ), new TableCell ().setV ("Value2" ))),
256- new TableRow ()
257- .setF (
258- ImmutableList .of (
259- new TableCell ().setV ("Value3" ), new TableCell ().setV ("Value4" ))));
260259
261260 BlockingQueue <Tuple <Iterable <FieldValueList >, Boolean >> pageCache =
262261 new LinkedBlockingDeque <>(2 );
@@ -266,7 +265,7 @@ public void testPopulateBuffer() throws InterruptedException {
266265 // This call should populate page cache
267266 ConnectionImpl connectionSpy = Mockito .spy (connection );
268267
269- connectionSpy .parseRpcDataAsync (tableRows , QUERY_SCHEMA , pageCache , rpcResponseQueue );
268+ connectionSpy .parseRpcDataAsync (TABLE_ROWS , QUERY_SCHEMA , pageCache , rpcResponseQueue );
270269
271270 verify (connectionSpy , times (1 ))
272271 .parseRpcDataAsync (
@@ -358,19 +357,62 @@ public void testLegacyQuerySinglePage() throws BigQuerySQLException {
358357 .createJobForQuery (any (com .google .api .services .bigquery .model .Job .class ));
359358 }
360359
360+ // calls executeSelect with a Fast query and emulates that no schema is returned with the first
361+ // page
362+ @ Test
363+ public void testFastQueryNullSchema () throws BigQuerySQLException {
364+ ConnectionImpl connectionSpy = Mockito .spy (connection );
365+ QueryRequest queryReqMock = new QueryRequest ();
366+ com .google .api .services .bigquery .model .JobStatistics stats =
367+ new com .google .api .services .bigquery .model .JobStatistics ()
368+ .setQuery (new JobStatistics2 ().setSchema (FAST_QUERY_TABLESCHEMA ));
369+ com .google .api .services .bigquery .model .Job jobResponseMock =
370+ new com .google .api .services .bigquery .model .Job ()
371+ // .setConfiguration(QUERY_JOB.g)
372+ .setJobReference (QUERY_JOB .toPb ())
373+ .setId (JOB )
374+ .setStatus (new com .google .api .services .bigquery .model .JobStatus ().setState ("DONE" ))
375+ .setStatistics (stats );
376+ // emulating a legacy query
377+ doReturn (true ).when (connectionSpy ).isFastQuerySupported ();
378+ com .google .api .services .bigquery .model .QueryResponse mockQueryRes =
379+ new QueryResponse ()
380+ .setSchema (FAST_QUERY_TABLESCHEMA )
381+ .setJobComplete (false ) // so that it goes to the else part in queryRpc
382+ .setTotalRows (new BigInteger (String .valueOf (4L )))
383+ .setJobReference (QUERY_JOB .toPb ())
384+ .setRows (TABLE_ROWS );
385+ when (bigqueryRpcMock .queryRpc (any (String .class ), any (QueryRequest .class )))
386+ .thenReturn (mockQueryRes );
387+ doReturn (GET_QUERY_RESULTS_RESPONSE_NULL_SCHEMA ) // wiring the null schema for the test case
388+ .when (connectionSpy )
389+ .getQueryResultsFirstPage (any (JobId .class ));
390+ doReturn (BQ_RS_MOCK_RES )
391+ .when (connectionSpy )
392+ .getSubsequentQueryResultsWithJob (
393+ any (Long .class ),
394+ any (Long .class ),
395+ any (JobId .class ),
396+ any (GetQueryResultsResponse .class ),
397+ any (Schema .class ),
398+ any (Boolean .class ));
399+ doReturn (jobResponseMock ).when (connectionSpy ).createDryRunJob (any (String .class ));
400+ BigQueryResult res = connectionSpy .executeSelect (SQL_QUERY );
401+ assertEquals (res .getTotalRows (), 2 );
402+ assertEquals (QUERY_SCHEMA , res .getSchema ());
403+ verify (connectionSpy , times (1 ))
404+ .getSubsequentQueryResultsWithJob (
405+ any (Long .class ),
406+ any (Long .class ),
407+ any (JobId .class ),
408+ any (GetQueryResultsResponse .class ),
409+ any (Schema .class ),
410+ any (Boolean .class ));
411+ }
412+
361413 // exercises getSubsequentQueryResultsWithJob for fast running queries
362414 @ Test
363415 public void testFastQueryLongRunning () throws SQLException {
364- List <TableRow > tableRows =
365- ImmutableList .of (
366- new TableRow ()
367- .setF (
368- ImmutableList .of (
369- new TableCell ().setV ("Value1" ), new TableCell ().setV ("Value2" ))),
370- new TableRow ()
371- .setF (
372- ImmutableList .of (
373- new TableCell ().setV ("Value3" ), new TableCell ().setV ("Value4" ))));
374416 ConnectionImpl connectionSpy = Mockito .spy (connection );
375417 // emulating a fast query
376418 doReturn (true ).when (connectionSpy ).isFastQuerySupported ();
@@ -389,7 +431,7 @@ public void testFastQueryLongRunning() throws SQLException {
389431 .setJobComplete (false )
390432 .setTotalRows (new BigInteger (String .valueOf (4L )))
391433 .setJobReference (QUERY_JOB .toPb ())
392- .setRows (tableRows );
434+ .setRows (TABLE_ROWS );
393435 when (bigqueryRpcMock .queryRpc (any (String .class ), any (QueryRequest .class )))
394436 .thenReturn (mockQueryRes );
395437 BigQueryResult res = connectionSpy .executeSelect (SQL_QUERY );
0 commit comments