Skip to content

Commit

Permalink
fix: add minimum timeout to getQueryResults API requests (#444)
Browse files Browse the repository at this point in the history
* fix: add minimum timeout to getQueryResults API requests

Since successful responses can still take a long time to download, have
a minimum timeout which should accomodate 99.9%+ of responses.

I figure it's more important that *any* timeout is set if desired than
it is that the specific timeout is used.  This is especially true in
cases where a short timeout is requested for the purposes of a progress
bar. Making forward progress is more important than the progress bar
update frequency.

* docs: document minimum timeout value

* test: remove redundant query timeout test

* test: change assertion for done method

* chore: remove unused import
  • Loading branch information
tswast authored Jan 8, 2021
1 parent 0023d19 commit 015a73e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 26 deletions.
22 changes: 20 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@
)
_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"

# In microbenchmarks, it's been shown that even in ideal conditions (query
# finished, local data), requests to getQueryResults can take 10+ seconds.
# In less-than-ideal situations, the response can take even longer, as it must
# be able to download a full 100+ MB row in that time. Don't let the
# connection timeout before data can be downloaded.
# https://github.com/googleapis/python-bigquery/issues/438
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120


class Project(object):
"""Wrapper for resource describing a BigQuery project.
Expand Down Expand Up @@ -1570,7 +1578,9 @@ def _get_query_results(
location (Optional[str]): Location of the query job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. If set, this connection timeout may be
increased to a minimum value. This prevents retries on what
would otherwise be a successful response.
Returns:
google.cloud.bigquery.query._QueryResults:
Expand All @@ -1579,6 +1589,9 @@ def _get_query_results(

extra_params = {"maxResults": 0}

if timeout is not None:
timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)

if project is None:
project = self.project

Expand Down Expand Up @@ -3293,7 +3306,9 @@ def _list_rows_from_query_results(
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. If set, this connection timeout may be
increased to a minimum value. This prevents retries on what
would otherwise be a successful response.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
Returns:
Expand All @@ -3306,6 +3321,9 @@ def _list_rows_from_query_results(
"location": location,
}

if timeout is not None:
timeout = max(timeout, _MIN_GET_QUERY_RESULTS_TIMEOUT)

if start_index is not None:
params["startIndex"] = start_index

Expand Down
33 changes: 11 additions & 22 deletions tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import uuid
import re

import requests
import psutil
import pytest
import pytz
Expand Down Expand Up @@ -1798,15 +1797,25 @@ def test_query_w_wrong_config(self):
Config.CLIENT.query(good_query, job_config=bad_config).result()

def test_query_w_timeout(self):
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = False

query_job = Config.CLIENT.query(
"SELECT * FROM `bigquery-public-data.github_repos.commits`;",
job_id_prefix="test_query_w_timeout_",
location="US",
job_config=job_config,
)

with self.assertRaises(concurrent.futures.TimeoutError):
# 1 second is much too short for this query.
query_job.result(timeout=1)

# Even though the query takes >1 second, the call to getQueryResults
# should succeed.
self.assertFalse(query_job.done(timeout=1))

Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location)

def test_query_w_page_size(self):
page_size = 45
query_job = Config.CLIENT.query(
Expand Down Expand Up @@ -2408,26 +2417,6 @@ def test_query_iter(self):
row_tuples = [r.values() for r in query_job]
self.assertEqual(row_tuples, [(1,)])

def test_querying_data_w_timeout(self):
job_config = bigquery.QueryJobConfig()
job_config.use_query_cache = False

query_job = Config.CLIENT.query(
"""
SELECT COUNT(*)
FROM UNNEST(GENERATE_ARRAY(1,1000000)), UNNEST(GENERATE_ARRAY(1, 10000))
""",
location="US",
job_config=job_config,
)

# Specify a very tight deadline to demonstrate that the timeout
# actually has effect.
with self.assertRaises(requests.exceptions.Timeout):
query_job.done(timeout=0.1)

Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_query_results_to_dataframe(self):
QUERY = """
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ def test_result_invokes_begins(self):
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_timeout(self):
import google.cloud.bigquery.client

begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
Expand All @@ -1072,6 +1074,10 @@ def test_result_w_timeout(self):
"/projects/{}/queries/{}".format(self.PROJECT, self.JOB_ID),
)
self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900)
self.assertEqual(
query_request[1]["timeout"],
google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT,
)
self.assertEqual(reload_request[1]["method"], "GET")

def test_result_w_page_size(self):
Expand Down
29 changes: 27 additions & 2 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
project="other-project",
location=self.LOCATION,
timeout_ms=500,
timeout=42,
timeout=420,
)

final_attributes.assert_called_once_with({"path": path}, client, None)
Expand All @@ -320,7 +320,32 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
method="GET",
path=path,
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
timeout=420,
)

def test__get_query_results_miss_w_short_timeout(self):
import google.cloud.bigquery.client
from google.cloud.exceptions import NotFound

creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection()
path = "/projects/other-project/queries/nothere"
with self.assertRaises(NotFound):
client._get_query_results(
"nothere",
None,
project="other-project",
location=self.LOCATION,
timeout_ms=500,
timeout=1,
)

conn.api_request.assert_called_once_with(
method="GET",
path=path,
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=google.cloud.bigquery.client._MIN_GET_QUERY_RESULTS_TIMEOUT,
)

def test__get_query_results_miss_w_client_location(self):
Expand Down

0 comments on commit 015a73e

Please sign in to comment.