diff --git a/CHANGELOG.md b/CHANGELOG.md index e016fa98..e3ee87ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +-Added automatic retry for datetime searches on index-not-found errors to resolve cache race conditions. [#564](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/564) + ### Changed ### Fixed diff --git a/README.md b/README.md index b050691b..e06c80a3 100644 --- a/README.md +++ b/README.md @@ -518,7 +518,7 @@ You can customize additional settings in your `.env` file: | `PROPERTIES_END_DATETIME_FIELD` | Specifies the field used for the upper value of a datetime range for the items in the backend database. | `properties.end_datetime` | Optional | | `COLLECTION_FIELD` | Specifies the field used for the collection an item belongs to in the backend database | `collection` | Optional | | `GEOMETRY_FIELD` | Specifies the field containing the geometry of the items in the backend database | `geometry` | Optional | - +| `STAC_SEARCH_MAX_RETRIES` | Maximum number of retry attempts for datetime search queries when index is not found | 3 | Optional | > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index 24a58885..425896ea 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -4,9 +4,12 @@ such as converting bounding boxes to polygon representations. """ +import asyncio import logging import os -from typing import Any, Dict, List, Optional, Set, Union +import random +from functools import wraps +from typing import Any, Callable, Dict, List, Optional, Set, Union from stac_fastapi.types.stac import Item @@ -195,3 +198,44 @@ def get_excluded_from_items(obj: dict, field_path: str) -> None: return current.pop(final, None) + + +def datetime_search_retry(func: Callable) -> Callable: + """Retry decorator for datetime search queries for NotFoundError/ConnectionError.""" + + @wraps(func) + async def wrapper(self, *args, **kwargs): + datetime_search = kwargs.get("datetime_search") + + is_datetime_query = bool(datetime_search and isinstance(datetime_search, dict)) + + if not is_datetime_query: + return await func(self, *args, **kwargs) + + base_delay = 0.5 + max_retries = int(os.getenv("STAC_SEARCH_MAX_RETRIES", "3")) + + # Validate max_retries is between 1 and 10 + if max_retries < 1 or max_retries > 10: + max_retries = 3 + + for attempt in range(max_retries): + try: + return await func(self, *args, **kwargs) + except Exception as e: + error_name = type(e).__name__.lower() + error_msg = str(e).lower() + + retry_error = ( + "notfound" in error_name + or "connection" in error_name + or "not found" in error_msg + ) + + if not retry_error or attempt == max_retries - 1: + raise + delay = base_delay * (2**attempt) + random.uniform(0, 0.1) + await asyncio.sleep(delay) + raise Exception("Search failed after retries") + + return wrapper diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index c3eadc7a..9db2b96d 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -22,7 +22,12 @@ CollectionSerializer, ItemSerializer, ) -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env +from stac_fastapi.core.utilities import ( + MAX_LIMIT, + bbox2polygon, + datetime_search_retry, + get_bool_env, +) from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.config import ( ElasticsearchSettings as SyncElasticsearchSettings, @@ -804,6 +809,7 @@ def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: """ return populate_sort_shared(sortby=sortby) + @datetime_search_retry async def execute_search( self, search: Search, diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 78d13d9c..4f150dc5 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -18,7 +18,12 @@ import stac_fastapi.sfeos_helpers.filter as filter_module from stac_fastapi.core.base_database_logic import BaseDatabaseLogic from stac_fastapi.core.serializers import CollectionSerializer, ItemSerializer -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon, get_bool_env +from stac_fastapi.core.utilities import ( + MAX_LIMIT, + bbox2polygon, + datetime_search_retry, + get_bool_env, +) from stac_fastapi.extensions.core.transaction.request import ( PartialCollection, PartialItem, @@ -804,6 +809,7 @@ def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: """ return populate_sort_shared(sortby=sortby) + @datetime_search_retry async def execute_search( self, search: Search, diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index 587fcb85..684913b2 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -7,6 +7,7 @@ import pytest +from stac_fastapi.core.utilities import datetime_search_retry from stac_fastapi.types.errors import ConflictError from ..conftest import create_collection, create_item @@ -1722,3 +1723,62 @@ async def test_hide_private_data_from_item(app_client, txn_client, load_test_dat assert "private_data" not in item["properties"] del os.environ["EXCLUDED_FROM_ITEMS"] + + +@pytest.mark.asyncio +async def test_datetime_search_retry_succeeds_on_retry(): + """Test retry works for datetime queries with NotFoundError.""" + call_counter = {"count": 0} + + @datetime_search_retry + async def mock_search(self, datetime_search=None): + call_counter["count"] += 1 + if call_counter["count"] < 2: + raise Exception("not found: Index not found.") + return "success" + + result = await mock_search(None, datetime_search={"datetime": "2025-01-01"}) + + assert result == "success" + assert call_counter["count"] == 2 + + +@pytest.mark.asyncio +async def test_datetime_search_no_retry_for_none_datetime(): + """Test that retry is invoked only for datetime queries.""" + call_counter = {"count": 0} + + @datetime_search_retry + async def mock_search(self, datetime_search=None): + call_counter["count"] += 1 + if call_counter["count"] < 2: + raise Exception("not found: Index not found.") + return "success" + + with pytest.raises(Exception): + await mock_search(None, datetime_search={}) + + assert call_counter["count"] == 1 + call_counter["count"] = 0 + + with pytest.raises(Exception): + await mock_search(None, datetime_search=None) + + assert call_counter["count"] == 1 + + +@pytest.mark.asyncio +async def test_datetime_search_max_retries(): + """Test that retry stops after max attempts.""" + call_counter = {"count": 0} + + @datetime_search_retry + async def mock_search(self, datetime_search=None): + call_counter["count"] += 1 + raise Exception(f"Not found: Fails, attempt {call_counter['count']}") + + with pytest.raises(Exception) as exc_info: + await mock_search(None, datetime_search={"start_datetime": "2025-01-01"}) + + assert call_counter["count"] == 3 + assert "attempt 3" in str(exc_info.value).lower()