From f60eeedb92423f65a37fdd3971c552610be7e1a7 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 21 Aug 2025 11:08:21 +0100 Subject: [PATCH 01/12] [PRMT-580]- Created a copy of bulk upload service --- lambdas/enums/lambda_error.py | 2 +- lambdas/handlers/mocks/token_handler.py | 4 +- lambdas/handlers/token_handler.py | 4 +- lambdas/services/bulk_upload_service_v2.py | 460 +++++++ .../search_patient_details_service.py | 4 +- lambdas/tests/unit/conftest.py | 1 - .../services/test_bulk_upload_service_v2.py | 1117 +++++++++++++++++ lambdas/utils/audit_logging_setup.py | 1 + lambdas/utils/logging_formatter.py | 3 +- 9 files changed, 1584 insertions(+), 12 deletions(-) create mode 100644 lambdas/services/bulk_upload_service_v2.py create mode 100644 lambdas/tests/unit/services/test_bulk_upload_service_v2.py diff --git a/lambdas/enums/lambda_error.py b/lambdas/enums/lambda_error.py index a3687f518..16a4545b9 100644 --- a/lambdas/enums/lambda_error.py +++ b/lambdas/enums/lambda_error.py @@ -361,7 +361,7 @@ def create_error_body(self, params: Optional[dict] = None, **kwargs) -> str: "err_code": "UC_4001", "message": "Missing GET request query parameters", } - + UploadConfirmResultFilesNotClean = { "err_code": "UC_4005", "message": "Some of the given document references are not referring to clean files", diff --git a/lambdas/handlers/mocks/token_handler.py b/lambdas/handlers/mocks/token_handler.py index 1389ce062..6f7424df3 100644 --- a/lambdas/handlers/mocks/token_handler.py +++ b/lambdas/handlers/mocks/token_handler.py @@ -38,9 +38,7 @@ def lambda_handler(event, context): login_service = LoginService(oidc_service=MockOidcService()) response = login_service.generate_session(state, auth_code) - logger.info( - "User logged in successfully", {"Result": "Successful login"} - ) + logger.info("User logged in successfully", {"Result": "Successful login"}) return ApiGatewayResponse( 200, json.dumps(response), "GET" ).create_api_gateway_response() diff --git a/lambdas/handlers/token_handler.py b/lambdas/handlers/token_handler.py index 204ad7985..19d06df85 100644 --- a/lambdas/handlers/token_handler.py +++ b/lambdas/handlers/token_handler.py @@ -38,9 +38,7 @@ def lambda_handler(event, context): login_service = LoginService(oidc_service=OidcService()) response = login_service.generate_session(state, auth_code) - logger.info( - "User logged in successfully", {"Result": "Successful login"} - ) + logger.info("User logged in successfully", {"Result": "Successful login"}) return ApiGatewayResponse( 200, json.dumps(response), "GET" ).create_api_gateway_response() diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py new file mode 100644 index 000000000..36fd0bf1a --- /dev/null +++ b/lambdas/services/bulk_upload_service_v2.py @@ -0,0 +1,460 @@ +import json +import os +import uuid +from datetime import datetime + +import pydantic +from botocore.exceptions import ClientError +from enums.patient_ods_inactive_status import PatientOdsInactiveStatus +from enums.snomed_codes import SnomedCodes +from enums.upload_status import UploadStatus +from enums.virus_scan_result import VirusScanResult +from models.document_reference import DocumentReference +from models.sqs.pdf_stitching_sqs_message import PdfStitchingSqsMessage +from models.staging_metadata import MetadataFile, StagingMetadata +from repositories.bulk_upload.bulk_upload_dynamo_repository import ( + BulkUploadDynamoRepository, +) +from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository +from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository +from utils.audit_logging_setup import LoggingService +from utils.exceptions import ( + BulkUploadException, + DocumentInfectedException, + InvalidMessageException, + InvalidNhsNumberException, + PatientRecordAlreadyExistException, + PdsErrorException, + PdsTooManyRequestsException, + S3FileNotFoundException, + VirusScanFailedException, + VirusScanNoResultException, +) +from utils.lloyd_george_validator import ( + LGInvalidFilesException, + allowed_to_ingest_ods_code, + getting_patient_info_from_pds, + validate_filename_with_patient_details_lenient, + validate_filename_with_patient_details_strict, + validate_lg_file_names, +) +from utils.request_context import request_context +from utils.unicode_utils import ( + contains_accent_char, + convert_to_nfc_form, + convert_to_nfd_form, +) +from utils.utilities import validate_nhs_number + +logger = LoggingService(__name__) + + +class BulkUploadService: + def __init__(self, strict_mode, pds_fhir_always_true=False): + self.dynamo_repository = BulkUploadDynamoRepository() + self.sqs_repository = BulkUploadSqsRepository() + self.bulk_upload_s3_repository = BulkUploadS3Repository() + self.strict_mode = strict_mode + self.pdf_content_type = "application/pdf" + self.unhandled_messages = [] + self.file_path_cache = {} + self.pdf_stitching_queue_url = os.environ["PDF_STITCHING_SQS_URL"] + self.pds_fhir_always_true = pds_fhir_always_true + + def process_message_queue(self, records: list): + for index, message in enumerate(records, start=1): + try: + logger.info(f"Processing message {index} of {len(records)}") + self.handle_sqs_message(message) + except (PdsTooManyRequestsException, PdsErrorException) as error: + logger.error(error) + + logger.info( + "Cannot validate patient due to PDS responded with Too Many Requests" + ) + logger.info("Cannot process for now due to PDS rate limit reached.") + logger.info( + "All remaining messages in this batch will be returned to sqs queue to retry later." + ) + + all_unprocessed_message = records[index - 1 :] + for unprocessed_message in all_unprocessed_message: + self.sqs_repository.put_sqs_message_back_to_queue( + unprocessed_message + ) + raise BulkUploadException( + "Bulk upload process paused due to PDS rate limit reached" + ) + except ( + ClientError, + InvalidMessageException, + LGInvalidFilesException, + Exception, + ) as error: + self.unhandled_messages.append(message) + logger.info(f"Failed to process current message due to error: {error}") + logger.info("Continue on next message") + + logger.info( + f"Finish Processing successfully {len(records) - len(self.unhandled_messages)} of {len(records)} messages" + ) + if self.unhandled_messages: + logger.info("Unable to process the following messages:") + for message in self.unhandled_messages: + message_body = json.loads(message.get("body", "{}")) + request_context.patient_nhs_no = message_body.get( + "NHS-NO", "no number found" + ) + logger.info(message_body) + + # def handle_sqs_message_v2(self, message: dict): + # logger.info("validate SQS event") + # staging_metadata = self.build_staging_metadata_from_message(message) + # logger.info("SQS event is valid. Validating NHS number and file names") + # + # accepted_reason, patient_ods_code = self.validate_entry(staging_metadata) + # if accepted_reason is None: + # return + # + # logger.info( + # "NHS Number and filename validation complete." + # "Validated strick mode, and if we can access the patient information ex:patient dead" + # " Checking virus scan has marked files as Clean" + # ) + # + # if not self.validate_virus_scan(staging_metadata, patient_ods_code): + # return + # logger.info("Virus result validation complete. Initialising transaction") + # + # self.initiate_transactions() + # logger.info("Transferring files and creating metadata") + # if not self.transfer_files(staging_metadata, patient_ods_code): + # return + # logger.info("File transfer complete. Removing uploaded files from staging bucket") + # self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() + # + # logger.info( + # f"Completed file ingestion for patient {staging_metadata.nhs_number}", + # {"Result": "Successful upload"}, + # ) + # logger.info("Reporting transaction successful") + # self.dynamo_repository.write_report_upload_to_dynamo( + # staging_metadata, + # UploadStatus.COMPLETE, + # accepted_reason, + # patient_ods_code, + # ) + # self.add_information_to_stitching_queue(staging_metadata, patient_ods_code, accepted_reason) + + def handle_sqs_message(self, message: dict): + logger.info("Validating SQS event") + patient_ods_code = "" + accepted_reason = None + try: + staging_metadata_json = message["body"] + staging_metadata = StagingMetadata.model_validate_json( + staging_metadata_json + ) + except (pydantic.ValidationError, KeyError) as e: + logger.error(f"Got incomprehensible message: {message}") + logger.error(e) + raise InvalidMessageException(str(e)) + + logger.info("SQS event is valid. Validating NHS number and file names") + + try: + file_names = [ + os.path.basename(metadata.file_path) + for metadata in staging_metadata.files + ] + request_context.patient_nhs_no = staging_metadata.nhs_number + validate_nhs_number(staging_metadata.nhs_number) + validate_lg_file_names(file_names, staging_metadata.nhs_number) + pds_patient_details = getting_patient_info_from_pds( + staging_metadata.nhs_number + ) + patient_ods_code = ( + pds_patient_details.get_ods_code_or_inactive_status_for_gp() + ) + if not self.pds_fhir_always_true: + if not self.strict_mode: + ( + name_validation_accepted_reason, + is_name_validation_based_on_historic_name, + ) = validate_filename_with_patient_details_lenient( + file_names, pds_patient_details + ) + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, name_validation_accepted_reason + ) + else: + is_name_validation_based_on_historic_name = ( + validate_filename_with_patient_details_strict( + file_names, pds_patient_details + ) + ) + if is_name_validation_based_on_historic_name: + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, "Patient matched on historical name" + ) + + if not allowed_to_ingest_ods_code(patient_ods_code): + raise LGInvalidFilesException( + "Patient not registered at your practice" + ) + patient_death_notification_status = ( + pds_patient_details.get_death_notification_status() + ) + if patient_death_notification_status: + deceased_accepted_reason = f"Patient is deceased - {patient_death_notification_status.name}" + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, deceased_accepted_reason + ) + if patient_ods_code is PatientOdsInactiveStatus.RESTRICTED: + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, "PDS record is restricted" + ) + + except ( + InvalidNhsNumberException, + LGInvalidFilesException, + PatientRecordAlreadyExistException, + ) as error: + logger.info( + f"Detected issue related to patient number: {staging_metadata.nhs_number}" + ) + logger.error(error) + logger.info("Will stop processing Lloyd George record for this patient.") + + reason = str(error) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, UploadStatus.FAILED, reason, patient_ods_code + ) + return + + logger.info( + "NHS Number and filename validation complete. Checking virus scan has marked files as Clean" + ) + + try: + self.resolve_source_file_path(staging_metadata) + self.bulk_upload_s3_repository.check_virus_result( + staging_metadata, self.file_path_cache + ) + except VirusScanNoResultException as e: + logger.info(e) + logger.info( + f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue" + ) + if staging_metadata.retries > 14: + err = ( + "File was not scanned for viruses before maximum retries attempted" + ) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, UploadStatus.FAILED, err, patient_ods_code + ) + else: + self.sqs_repository.put_staging_metadata_back_to_queue(staging_metadata) + return + except (VirusScanFailedException, DocumentInfectedException) as e: + logger.info(e) + logger.info( + f"Virus scan results check failed for: {staging_metadata.nhs_number}, removing from queue" + ) + logger.info("Will stop processing Lloyd George record for this patient") + + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "One or more of the files failed virus scanner check", + patient_ods_code, + ) + return + except S3FileNotFoundException as e: + logger.info(e) + logger.info( + f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}" + ) + logger.info("Will stop processing Lloyd George record for this patient") + + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "One or more of the files is not accessible from staging bucket", + patient_ods_code, + ) + return + + logger.info("Virus result validation complete. Initialising transaction") + + self.bulk_upload_s3_repository.init_transaction() + self.dynamo_repository.init_transaction() + + logger.info( + "Transaction initialised. Transferring files to main S3 bucket and creating metadata" + ) + + try: + self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) + logger.info( + f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", + {"Result": "Successful upload"}, + ) + except ClientError as e: + logger.info( + f"Got unexpected error during file transfer: {str(e)}", + {"Result": "Unsuccessful upload"}, + ) + logger.info("Will try to rollback any change to database and bucket") + self.rollback_transaction() + + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + patient_ods_code, + ) + return + + logger.info( + "File transfer complete. Removing uploaded files from staging bucket" + ) + self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() + + logger.info( + f"Completed file ingestion for patient {staging_metadata.nhs_number}", + {"Result": "Successful upload"}, + ) + logger.info("Reporting transaction successful") + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.COMPLETE, + accepted_reason, + patient_ods_code, + ) + + pdf_stitching_sqs_message = PdfStitchingSqsMessage( + nhs_number=staging_metadata.nhs_number, + snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value, + ) + self.sqs_repository.send_message_to_pdf_stitching_queue( + queue_url=self.pdf_stitching_queue_url, + message=pdf_stitching_sqs_message, + ) + logger.info( + f"Message sent to stitching queue for patient {staging_metadata.nhs_number}" + ) + + def resolve_source_file_path(self, staging_metadata: StagingMetadata): + sample_file_path = staging_metadata.files[0].file_path + + if not contains_accent_char(sample_file_path): + logger.info("No accented character detected in file path.") + self.file_path_cache = { + file.file_path: self.strip_leading_slash(file.file_path) + for file in staging_metadata.files + } + return + + logger.info("Detected accented character in file path.") + logger.info("Will take special steps to handle file names.") + + resolved_file_paths = {} + for file in staging_metadata.files: + file_path_in_metadata = file.file_path + file_path_without_leading_slash = self.strip_leading_slash( + file_path_in_metadata + ) + file_path_in_nfc_form = convert_to_nfc_form(file_path_without_leading_slash) + file_path_in_nfd_form = convert_to_nfd_form(file_path_without_leading_slash) + + if self.bulk_upload_s3_repository.file_exists_on_staging_bucket( + file_path_in_nfc_form + ): + resolved_file_paths[file_path_in_metadata] = file_path_in_nfc_form + elif self.bulk_upload_s3_repository.file_exists_on_staging_bucket( + file_path_in_nfd_form + ): + resolved_file_paths[file_path_in_metadata] = file_path_in_nfd_form + else: + logger.info( + "No file matching the provided file path was found on S3 bucket" + ) + logger.info("Please check whether files are named correctly") + raise S3FileNotFoundException( + f"Failed to access file {sample_file_path}" + ) + + self.file_path_cache = resolved_file_paths + + def create_lg_records_and_copy_files( + self, staging_metadata: StagingMetadata, current_gp_ods: str + ): + nhs_number = staging_metadata.nhs_number + for file_metadata in staging_metadata.files: + document_reference = self.convert_to_document_reference( + file_metadata, nhs_number, current_gp_ods + ) + + source_file_key = self.file_path_cache[file_metadata.file_path] + dest_file_key = document_reference.s3_file_key + + self.bulk_upload_s3_repository.copy_to_lg_bucket( + source_file_key=source_file_key, dest_file_key=dest_file_key + ) + s3_bucket_name = self.bulk_upload_s3_repository.lg_bucket_name + + document_reference.file_size = ( + self.bulk_upload_s3_repository.s3_repository.get_file_size( + s3_bucket_name=s3_bucket_name, object_key=dest_file_key + ) + ) + document_reference.set_uploaded_to_true() + document_reference.doc_status = "final" + self.dynamo_repository.create_record_in_lg_dynamo_table(document_reference) + + def rollback_transaction(self): + try: + self.bulk_upload_s3_repository.rollback_transaction() + self.dynamo_repository.rollback_transaction() + logger.info("Rolled back an incomplete transaction") + except ClientError as e: + logger.error( + f"Failed to rollback the incomplete transaction due to error: {e}" + ) + + def convert_to_document_reference( + self, file_metadata: MetadataFile, nhs_number: str, current_gp_ods: str + ) -> DocumentReference: + s3_bucket_name = self.bulk_upload_s3_repository.lg_bucket_name + file_name = os.path.basename(file_metadata.file_path) + if file_metadata.scan_date: + scan_date_formatted = datetime.strptime( + file_metadata.scan_date, "%d/%m/%Y" + ).strftime("%Y-%m-%d") + else: + scan_date_formatted = None + document_reference = DocumentReference( + id=str(uuid.uuid4()), + nhs_number=nhs_number, + file_name=file_name, + s3_bucket_name=s3_bucket_name, + current_gp_ods=current_gp_ods, + custodian=current_gp_ods, + author=file_metadata.gp_practice_code, + document_scan_creation=scan_date_formatted, + doc_status="preliminary", + ) + document_reference.set_virus_scanner_result(VirusScanResult.CLEAN) + + return document_reference + + @staticmethod + def strip_leading_slash(filepath: str) -> str: + # Handle the filepaths irregularity in the given example of metadata.csv, + # where some filepaths begin with '/' and some does not. + return filepath.lstrip("/") + + @staticmethod + def concatenate_acceptance_reason(previous_reasons: str | None, new_reason: str): + return previous_reasons + ", " + new_reason if previous_reasons else new_reason diff --git a/lambdas/services/search_patient_details_service.py b/lambdas/services/search_patient_details_service.py index 55ddcda09..59db240af 100644 --- a/lambdas/services/search_patient_details_service.py +++ b/lambdas/services/search_patient_details_service.py @@ -46,9 +46,7 @@ def handle_search_patient_request(self, nhs_number, update_session=True): if not patient_details.deceased: self._check_authorization(patient_details.general_practice_ods) - logger.info( - "Searched for patient details", {"Result": "Patient found"} - ) + logger.info("Searched for patient details", {"Result": "Patient found"}) if update_session: self._update_session(nhs_number, patient_details.deceased) diff --git a/lambdas/tests/unit/conftest.py b/lambdas/tests/unit/conftest.py index 27fafe7ac..8dc874eb1 100644 --- a/lambdas/tests/unit/conftest.py +++ b/lambdas/tests/unit/conftest.py @@ -4,7 +4,6 @@ from contextlib import contextmanager from dataclasses import dataclass from enum import Enum -from unittest import mock import pytest from botocore.exceptions import ClientError diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py new file mode 100644 index 000000000..7353d232f --- /dev/null +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -0,0 +1,1117 @@ +import json +from copy import copy + +import pytest +from botocore.exceptions import ClientError +from enums.patient_ods_inactive_status import PatientOdsInactiveStatus +from enums.upload_status import UploadStatus +from enums.virus_scan_result import SCAN_RESULT_TAG_KEY, VirusScanResult +from freezegun import freeze_time +from models.pds_models import Patient +from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository +from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository +from services.bulk_upload_service_v2 import BulkUploadService +from tests.unit.conftest import ( + MOCK_LG_BUCKET, + MOCK_STAGING_STORE_BUCKET, + TEST_CURRENT_GP_ODS, +) +from tests.unit.helpers.data.bulk_upload.test_data import ( + TEST_DOCUMENT_REFERENCE, + TEST_FILE_METADATA, + TEST_SQS_10_MESSAGES_AS_LIST, + TEST_SQS_MESSAGE, + TEST_SQS_MESSAGE_SINGLE_FILE, + TEST_SQS_MESSAGE_WITH_INVALID_FILENAME, + TEST_SQS_MESSAGES_AS_LIST, + TEST_STAGING_METADATA, + TEST_STAGING_METADATA_SINGLE_FILE, + TEST_STAGING_METADATA_WITH_INVALID_FILENAME, + build_test_sqs_message, + build_test_staging_metadata_from_patient_name, + make_s3_file_paths, + make_valid_lg_file_names, +) +from tests.unit.helpers.data.pds.pds_patient_response import ( + PDS_PATIENT, + PDS_PATIENT_DECEASED_FORMAL, + PDS_PATIENT_DECEASED_INFORMAL, + PDS_PATIENT_RESTRICTED, +) +from tests.unit.utils.test_unicode_utils import ( + NAME_WITH_ACCENT_NFC_FORM, + NAME_WITH_ACCENT_NFD_FORM, +) +from utils.exceptions import ( + BulkUploadException, + DocumentInfectedException, + InvalidMessageException, + PatientRecordAlreadyExistException, + PdsTooManyRequestsException, + S3FileNotFoundException, + VirusScanNoResultException, +) +from utils.lloyd_george_validator import LGInvalidFilesException + + +@pytest.fixture +def repo_under_test(set_env, mocker): + service = BulkUploadService(strict_mode=True) + mocker.patch.object(service, "dynamo_repository") + mocker.patch.object(service, "sqs_repository") + mocker.patch.object(service, "bulk_upload_s3_repository") + yield service + + +@pytest.fixture +def mock_check_virus_result(mocker): + yield mocker.patch.object(BulkUploadS3Repository, "check_virus_result") + + +@pytest.fixture +def mock_validate_files(mocker): + yield mocker.patch("services.bulk_upload_service_v2.validate_lg_file_names") + + +@pytest.fixture +def mock_pds_service(mocker): + patient = Patient.model_validate(PDS_PATIENT) + mocker.patch( + "services.bulk_upload_service_v2.getting_patient_info_from_pds", + return_value=patient, + ) + yield patient + + +@pytest.fixture +def mock_pds_service_patient_deceased_formal(mocker): + patient = Patient.model_validate(PDS_PATIENT_DECEASED_FORMAL) + mocker.patch( + "services.bulk_upload_service_v2.getting_patient_info_from_pds", + return_value=patient, + ) + yield patient + + +@pytest.fixture +def mock_pds_service_patient_deceased_informal(mocker): + patient = Patient.model_validate(PDS_PATIENT_DECEASED_INFORMAL) + mocker.patch( + "services.bulk_upload_service_v2.getting_patient_info_from_pds", + return_value=patient, + ) + yield patient + + +@pytest.fixture +def mock_pds_service_patient_restricted(mocker): + patient = Patient.model_validate(PDS_PATIENT_RESTRICTED) + mocker.patch( + "services.bulk_upload_service_v2.getting_patient_info_from_pds", + return_value=patient, + ) + yield patient + + +@pytest.fixture +def mock_pds_validation_lenient(mocker): + yield mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("test string", True), + ) + + +@pytest.fixture +def mock_pds_validation_strict(mocker): + yield mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_strict", + ) + + +@pytest.fixture +def mock_ods_validation(mocker): + yield mocker.patch("services.bulk_upload_service_v2.allowed_to_ingest_ods_code") + + +@pytest.fixture +def mock_handle_sqs_message(mocker): + yield mocker.patch.object(BulkUploadService, "handle_sqs_message") + + +@pytest.fixture +def mock_back_to_queue(mocker): + yield mocker.patch.object(BulkUploadSqsRepository, "put_sqs_message_back_to_queue") + + +def build_resolved_file_names_cache( + file_path_in_metadata: list[str], file_path_in_s3: list[str] +) -> dict: + return dict(zip(file_path_in_metadata, file_path_in_s3)) + + +def test_lambda_handler_process_each_sqs_message_one_by_one( + set_env, mock_handle_sqs_message +): + service = BulkUploadService(True) + + service.process_message_queue(TEST_SQS_MESSAGES_AS_LIST) + + assert mock_handle_sqs_message.call_count == len(TEST_SQS_MESSAGES_AS_LIST) + for message in TEST_SQS_MESSAGES_AS_LIST: + mock_handle_sqs_message.assert_any_call(message) + + +def test_lambda_handler_continue_process_next_message_after_handled_error( + set_env, mock_handle_sqs_message +): + # emulate that unexpected error happen at 2nd message + mock_handle_sqs_message.side_effect = [ + None, + InvalidMessageException, + None, + ] + service = BulkUploadService(True) + service.process_message_queue(TEST_SQS_MESSAGES_AS_LIST) + + assert mock_handle_sqs_message.call_count == len(TEST_SQS_MESSAGES_AS_LIST) + mock_handle_sqs_message.assert_called_with(TEST_SQS_MESSAGES_AS_LIST[2]) + + +def test_lambda_handler_handle_pds_too_many_requests_exception( + set_env, mock_handle_sqs_message, mock_back_to_queue +): + # emulate that unexpected error happen at 7th message + mock_handle_sqs_message.side_effect = ( + [None] * 6 + [PdsTooManyRequestsException] + [None] * 3 + ) + expected_handled_messages = TEST_SQS_10_MESSAGES_AS_LIST[0:6] + expected_unhandled_message = TEST_SQS_10_MESSAGES_AS_LIST[6:] + + service = BulkUploadService(True) + with pytest.raises(BulkUploadException): + service.process_message_queue(TEST_SQS_10_MESSAGES_AS_LIST) + + assert mock_handle_sqs_message.call_count == 7 + + for message in expected_handled_messages: + mock_handle_sqs_message.assert_any_call(message) + + for message in expected_unhandled_message: + mock_back_to_queue.assert_any_call(message) + + +def test_handle_sqs_message_happy_path( + set_env, + mocker, + mock_uuid, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_report_upload_complete = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mocker.patch.object(repo_under_test.bulk_upload_s3_repository, "check_virus_result") + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + mock_create_lg_records_and_copy_files.assert_called_with( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS + ) + mock_pds_validation_strict.assert_called() + mock_report_upload_complete.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_called() + + +def test_handle_sqs_message_happy_path_single_file( + set_env, + mocker, + mock_uuid, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_create_lg_records_and_copy_files.return_value = TEST_DOCUMENT_REFERENCE + mock_report_upload_complete = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mocker.patch.object(repo_under_test.bulk_upload_s3_repository, "check_virus_result") + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE_SINGLE_FILE) + + mock_create_lg_records_and_copy_files.assert_called_with( + TEST_STAGING_METADATA_SINGLE_FILE, TEST_CURRENT_GP_ODS + ) + mock_report_upload_complete.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_called() + + +def set_up_mocks_for_non_ascii_files( + service: BulkUploadService, mocker, patient_name_on_s3: str +): + service.s3_service = mocker.MagicMock() + service.dynamo_repository = mocker.MagicMock() + + expected_s3_file_paths = make_s3_file_paths( + make_valid_lg_file_names(total_number=3, patient_name=patient_name_on_s3) + ) + + def mock_file_exist_on_s3(file_key: str) -> bool: + return file_key in expected_s3_file_paths + + def mock_get_tag_value(s3_bucket_name: str, file_key: str, tag_key: str) -> str: + if ( + s3_bucket_name == MOCK_STAGING_STORE_BUCKET + and tag_key == SCAN_RESULT_TAG_KEY + and file_key in expected_s3_file_paths + ): + return VirusScanResult.CLEAN + + raise RuntimeError( + "Unexpected S3 tag calls during non-ascii file name test case." + ) + + def mock_copy_across_bucket( + source_bucket: str, source_file_key: str, dest_bucket: str, **_kwargs + ): + if ( + source_bucket == MOCK_STAGING_STORE_BUCKET + and dest_bucket == MOCK_LG_BUCKET + and source_file_key in expected_s3_file_paths + ): + return + + raise RuntimeError("Unexpected S3 calls during non-ascii file name test case.") + + service.s3_service.get_tag_value.side_effect = mock_get_tag_value + service.s3_service.copy_across_bucket.side_effect = mock_copy_across_bucket + service.s3_service.file_exists_on_staging_bucket.side_effect = mock_file_exist_on_s3 + + +@pytest.mark.parametrize( + ["patient_name_in_metadata_file", "patient_name_on_s3"], + [ + (NAME_WITH_ACCENT_NFC_FORM, NAME_WITH_ACCENT_NFC_FORM), + (NAME_WITH_ACCENT_NFC_FORM, NAME_WITH_ACCENT_NFD_FORM), + (NAME_WITH_ACCENT_NFD_FORM, NAME_WITH_ACCENT_NFC_FORM), + (NAME_WITH_ACCENT_NFD_FORM, NAME_WITH_ACCENT_NFD_FORM), + ], + ids=["NFC --> NFC", "NFC --> NFD", "NFD --> NFC", "NFD --> NFD"], +) +def test_handle_sqs_message_happy_path_with_non_ascii_filenames( + repo_under_test, + set_env, + mocker, + mock_validate_files, + patient_name_on_s3, + patient_name_in_metadata_file, + mock_pds_validation_strict, + mock_pds_service, + mock_ods_validation, +): + mock_validate_files.return_value = None + repo_under_test.bulk_upload_s3_repository.lg_bucket_name = MOCK_LG_BUCKET + set_up_mocks_for_non_ascii_files(repo_under_test, mocker, patient_name_on_s3) + test_staging_metadata = build_test_staging_metadata_from_patient_name( + patient_name_in_metadata_file + ) + test_sqs_message = build_test_sqs_message(test_staging_metadata) + + repo_under_test.handle_sqs_message(message=test_sqs_message) + + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called() + assert repo_under_test.bulk_upload_s3_repository.check_virus_result.call_count == 1 + assert repo_under_test.bulk_upload_s3_repository.copy_to_lg_bucket.call_count == 3 + + +def test_handle_sqs_message_calls_report_upload_failure_when_patient_record_already_in_repo( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, +): + TEST_STAGING_METADATA.retries = 0 + + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mocked_error = PatientRecordAlreadyExistException( + "Lloyd George already exists for patient, upload cancelled." + ) + mock_validate_files.side_effect = mocked_error + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_not_called() + mock_remove_ingested_file_from_source_bucket.assert_not_called() + mock_report_upload_failure.assert_called_with( + TEST_STAGING_METADATA, UploadStatus.FAILED, str(mocked_error), "" + ) + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + + +def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invalid( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, +): + TEST_STAGING_METADATA.retries = 0 + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mocked_error = LGInvalidFilesException( + "One or more of the files do not match naming convention" + ) + mock_validate_files.side_effect = mocked_error + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE_WITH_INVALID_FILENAME) + + mock_create_lg_records_and_copy_files.assert_not_called() + mock_remove_ingested_file_from_source_bucket.assert_not_called() + mock_report_upload_failure.assert_called_with( + TEST_STAGING_METADATA_WITH_INVALID_FILENAME, + UploadStatus.FAILED, + str(mocked_error), + "", + ) + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + + +def test_handle_sqs_message_report_failure_when_document_is_infected( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + repo_under_test.bulk_upload_s3_repository.check_virus_result.side_effect = ( + DocumentInfectedException + ) + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_report_upload_failure.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "One or more of the files failed virus scanner check", + "Y12345", + ) + mock_create_lg_records_and_copy_files.assert_not_called() + mock_remove_ingested_file_from_source_bucket.assert_not_called() + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + + +def test_handle_sqs_message_report_failure_when_document_not_exist( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + repo_under_test.bulk_upload_s3_repository.check_virus_result.side_effect = ( + S3FileNotFoundException + ) + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_report_upload_failure.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "One or more of the files is not accessible from staging bucket", + "Y12345", + ) + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + + +def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_formally_deceased( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service_patient_deceased_formal, + mock_pds_validation_strict, + mock_ods_validation, +): + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_remove_ingested_file_from_source_bucket = ( + repo_under_test.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket + ) + mock_pds_validation_strict.return_value = False + mock_put_staging_metadata_back_to_queue = ( + repo_under_test.sqs_repository.put_staging_metadata_back_to_queue + ) + mock_report_upload = repo_under_test.dynamo_repository.write_report_upload_to_dynamo + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + mock_put_staging_metadata_back_to_queue.assert_not_called() + + mock_report_upload.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.COMPLETE, + "Patient is deceased - FORMAL", + PatientOdsInactiveStatus.DECEASED, + ) + + +def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_informally_deceased_and_historical( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service_patient_deceased_informal, + mock_pds_validation_strict, + mock_ods_validation, +): + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_pds_validation_strict.return_value = True + mock_remove_ingested_file_from_source_bucket = ( + repo_under_test.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket + ) + mock_put_staging_metadata_back_to_queue = ( + repo_under_test.sqs_repository.put_staging_metadata_back_to_queue + ) + mock_report_upload = repo_under_test.dynamo_repository.write_report_upload_to_dynamo + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + mock_put_staging_metadata_back_to_queue.assert_not_called() + + mock_report_upload.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.COMPLETE, + "Patient matched on historical name, Patient is deceased - INFORMAL", + "Y12345", + ) + + +def test_handle_sqs_message_calls_report_upload_successful_when_patient_has_historical_name_and_rest( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service_patient_restricted, + mock_pds_validation_strict, + mock_ods_validation, +): + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_pds_validation_strict.return_value = True + mock_remove_ingested_file_from_source_bucket = ( + repo_under_test.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket + ) + mock_put_staging_metadata_back_to_queue = ( + repo_under_test.sqs_repository.put_staging_metadata_back_to_queue + ) + mock_report_upload = repo_under_test.dynamo_repository.write_report_upload_to_dynamo + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + mock_put_staging_metadata_back_to_queue.assert_not_called() + + mock_report_upload.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.COMPLETE, + "Patient matched on historical name, PDS record is restricted", + "REST", + ) + + +def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_informally_deceased( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service_patient_deceased_informal, + mock_pds_validation_strict, + mock_ods_validation, +): + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_pds_validation_strict.return_value = False + mock_remove_ingested_file_from_source_bucket = ( + repo_under_test.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket + ) + mock_put_staging_metadata_back_to_queue = ( + repo_under_test.sqs_repository.put_staging_metadata_back_to_queue + ) + mock_report_upload = repo_under_test.dynamo_repository.write_report_upload_to_dynamo + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + mock_put_staging_metadata_back_to_queue.assert_not_called() + + mock_report_upload.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.COMPLETE, + "Patient is deceased - INFORMAL", + "Y12345", + ) + + +def test_handle_sqs_message_put_staging_metadata_back_to_queue_when_virus_scan_result_not_available( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_check_virus_result, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + repo_under_test.bulk_upload_s3_repository.check_virus_result.side_effect = ( + VirusScanNoResultException + ) + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mock_put_staging_metadata_back_to_queue = mocker.patch.object( + repo_under_test.sqs_repository, "put_staging_metadata_back_to_queue" + ) + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_put_staging_metadata_back_to_queue.assert_called_with(TEST_STAGING_METADATA) + + mock_report_upload_failure.assert_not_called() + mock_create_lg_records_and_copy_files.assert_not_called() + mock_remove_ingested_file_from_source_bucket.assert_not_called() + repo_under_test.sqs_repository.send_message_to_pdf_stitching_queue.assert_not_called() + + +def test_handle_sqs_message_rollback_transaction_when_validation_pass_but_file_transfer_failed_halfway( + repo_under_test, + set_env, + mocker, + mock_uuid, + mock_check_virus_result, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + repo_under_test.bulk_upload_s3_repository.lg_bucket_name = MOCK_LG_BUCKET + + TEST_STAGING_METADATA.retries = 0 + mock_rollback_transaction_s3 = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, "rollback_transaction" + ) + mock_rollback_transaction_dynamo = mocker.patch.object( + repo_under_test.dynamo_repository, "rollback_transaction" + ) + mock_report_upload_failure = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mock_client_error = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access Denied"}}, + "GetObject", + ) + + # simulate a client error occur when copying the 3rd file + repo_under_test.bulk_upload_s3_repository.copy_to_lg_bucket.side_effect = [ + None, + None, + mock_client_error, + ] + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_rollback_transaction_dynamo.assert_called() + mock_rollback_transaction_s3.assert_called() + mock_report_upload_failure.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + "Y12345", + ) + mock_remove_ingested_file_from_source_bucket.assert_not_called() + assert ( + repo_under_test.dynamo_repository.create_record_in_lg_dynamo_table.call_count + == 2 + ) + + +def test_handle_sqs_message_raise_InvalidMessageException_when_failed_to_extract_data_from_message( + repo_under_test, set_env, mocker +): + invalid_message = {"body": "invalid content"} + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + + with pytest.raises(InvalidMessageException): + repo_under_test.handle_sqs_message(invalid_message) + + mock_create_lg_records_and_copy_files.assert_not_called() + + +def test_validate_files_raise_LGInvalidFilesException_when_file_names_invalid( + repo_under_test, set_env, mock_validate_files +): + TEST_STAGING_METADATA.retries = 0 + invalid_file_name_metadata_as_json = json.dumps( + TEST_STAGING_METADATA_WITH_INVALID_FILENAME.model_dump() + ) + mock_validate_files.side_effect = LGInvalidFilesException + + repo_under_test.handle_sqs_message({"body": invalid_file_name_metadata_as_json}) + + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called() + + +@freeze_time("2023-10-2 13:00:00") +def test_reports_failure_when_max_retries_reached( + set_env, mocker, mock_uuid, repo_under_test, mock_validate_files +): + mocker.patch("uuid.uuid4", return_value="123412342") + + TEST_STAGING_METADATA.retries = 15 + metadata_as_json = json.dumps(TEST_STAGING_METADATA.model_dump()) + mock_validate_files.side_effect = LGInvalidFilesException + repo_under_test.handle_sqs_message({"body": metadata_as_json}) + + repo_under_test.sqs_repository.send_message_with_nhs_number_attr_fifo.assert_not_called() + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called() + + +def test_resolve_source_file_path_when_filenames_dont_have_accented_chars( + set_env, repo_under_test +): + expected = { + file.file_path: file.file_path.lstrip("/") + for file in TEST_STAGING_METADATA.files + } + + repo_under_test.resolve_source_file_path(TEST_STAGING_METADATA) + actual = repo_under_test.file_path_cache + + assert actual == expected + + +@pytest.mark.parametrize( + ["patient_name_in_metadata_file", "patient_name_on_s3"], + [ + (NAME_WITH_ACCENT_NFC_FORM, NAME_WITH_ACCENT_NFC_FORM), + (NAME_WITH_ACCENT_NFC_FORM, NAME_WITH_ACCENT_NFD_FORM), + (NAME_WITH_ACCENT_NFD_FORM, NAME_WITH_ACCENT_NFC_FORM), + (NAME_WITH_ACCENT_NFD_FORM, NAME_WITH_ACCENT_NFD_FORM), + ], + ids=["NFC --> NFC", "NFC --> NFD", "NFD --> NFC", "NFD --> NFD"], +) +def test_resolve_source_file_path_when_filenames_have_accented_chars( + set_env, mocker, patient_name_on_s3, patient_name_in_metadata_file, repo_under_test +): + patient_name = "Évèlynêë François Ågāřdñ" + expected_cache = {} + for i in range(1, 4): + file_path_in_metadata = ( + f"/9000000009/{i}of3_Lloyd_George_Record_" + f"[{patient_name_in_metadata_file}]_[9000000009]_[22-10-2010].pdf" + ) + file_path_on_s3 = f"9000000009/{i}of3_Lloyd_George_Record_[{patient_name}]_[9000000009]_[22-10-2010].pdf" + expected_cache[file_path_in_metadata] = file_path_on_s3 + + set_up_mocks_for_non_ascii_files(repo_under_test, mocker, patient_name_on_s3) + test_staging_metadata = build_test_staging_metadata_from_patient_name( + patient_name_in_metadata_file + ) + repo_under_test.resolve_source_file_path(test_staging_metadata) + actual = repo_under_test.file_path_cache + + assert actual == expected_cache + + +def test_resolves_source_file_path_raise_S3FileNotFoundException_if_filename_cant_match( + set_env, mocker, repo_under_test +): + patient_name_on_s3 = "Some Name That Not Matching Metadata File" + patient_name_in_metadata_file = NAME_WITH_ACCENT_NFC_FORM + repo_under_test.bulk_upload_s3_repository.file_exists_on_staging_bucket.return_value = ( + False + ) + + set_up_mocks_for_non_ascii_files(repo_under_test, mocker, patient_name_on_s3) + test_staging_metadata = build_test_staging_metadata_from_patient_name( + patient_name_in_metadata_file + ) + + with pytest.raises(S3FileNotFoundException): + repo_under_test.resolve_source_file_path(test_staging_metadata) + + +def test_create_lg_records_and_copy_files(set_env, mocker, mock_uuid, repo_under_test): + test_document_reference = copy(TEST_DOCUMENT_REFERENCE) + repo_under_test.convert_to_document_reference = mocker.MagicMock( + return_value=test_document_reference + ) + TEST_STAGING_METADATA.retries = 0 + repo_under_test.resolve_source_file_path(TEST_STAGING_METADATA) + + repo_under_test.create_lg_records_and_copy_files( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS + ) + + nhs_number = TEST_STAGING_METADATA.nhs_number + + for file in TEST_STAGING_METADATA.files: + expected_source_file_key = BulkUploadService.strip_leading_slash(file.file_path) + expected_dest_file_key = f"{nhs_number}/{mock_uuid}" + repo_under_test.bulk_upload_s3_repository.copy_to_lg_bucket.assert_any_call( + source_file_key=expected_source_file_key, + dest_file_key=expected_dest_file_key, + ) + assert test_document_reference.uploaded.__eq__(True) + assert repo_under_test.bulk_upload_s3_repository.copy_to_lg_bucket.call_count == 3 + repo_under_test.dynamo_repository.create_record_in_lg_dynamo_table.assert_any_call( + test_document_reference + ) + assert ( + repo_under_test.dynamo_repository.create_record_in_lg_dynamo_table.call_count + == 3 + ) + + +@freeze_time("2024-01-01 12:00:00") +def test_convert_to_document_reference(set_env, mock_uuid, repo_under_test): + TEST_STAGING_METADATA.retries = 0 + repo_under_test.bulk_upload_s3_repository.lg_bucket_name = "test_lg_s3_bucket" + expected = TEST_DOCUMENT_REFERENCE + + actual = repo_under_test.convert_to_document_reference( + file_metadata=TEST_FILE_METADATA, + nhs_number=TEST_STAGING_METADATA.nhs_number, + current_gp_ods=TEST_CURRENT_GP_ODS, + ) + + assert actual == expected + + +@freeze_time("2024-01-01 12:00:00") +def test_convert_to_document_reference_missing_scan_date( + set_env, mock_uuid, repo_under_test +): + TEST_STAGING_METADATA.retries = 0 + repo_under_test.bulk_upload_s3_repository.lg_bucket_name = "test_lg_s3_bucket" + expected = TEST_DOCUMENT_REFERENCE + expected.document_scan_creation = None + TEST_FILE_METADATA.scan_date = "" + + actual = repo_under_test.convert_to_document_reference( + file_metadata=TEST_FILE_METADATA, + nhs_number=TEST_STAGING_METADATA.nhs_number, + current_gp_ods=TEST_CURRENT_GP_ODS, + ) + + assert actual == expected + + TEST_FILE_METADATA.scan_date = "03/09/2022" + TEST_DOCUMENT_REFERENCE.document_scan_creation = "2022-09-03" + + +def test_raise_client_error_from_ssm_with_pds_service( + mock_ods_validation, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, +): + mock_client_error = ClientError( + {"Error": {"Code": "500", "Message": "test error"}}, "testing" + ) + mock_ods_validation.side_effect = mock_client_error + with pytest.raises(ClientError): + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + +def test_mismatch_ods_with_pds_service( + repo_under_test, + mock_ods_validation, + set_env, + mock_uuid, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, +): + mock_ods_validation.return_value = False + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "Patient not registered at your practice", + "Y12345", + ) + + +def test_create_lg_records_and_copy_files_client_error( + repo_under_test, + mocker, + set_env, + mock_uuid, + mock_check_virus_result, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + mock_create_lg_records_and_copy_files = mocker.patch.object( + repo_under_test, "create_lg_records_and_copy_files" + ) + mock_rollback_transaction = mocker.patch.object( + repo_under_test, "rollback_transaction" + ) + mock_client_error = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access Denied"}}, + "GetObject", + ) + mock_create_lg_records_and_copy_files.side_effect = mock_client_error + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_rollback_transaction.assert_called() + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + "Y12345", + ) + repo_under_test.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket.assert_not_called() + repo_under_test.dynamo_repository.report_upload_complete.assert_not_called() + + +def test_handle_sqs_message_happy_path_historical_name( + set_env, + mocker, + mock_uuid, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_report_upload_complete = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + mocker.patch.object(repo_under_test.bulk_upload_s3_repository, "check_virus_result") + mock_pds_validation_strict.return_value = True + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE) + + mock_create_lg_records_and_copy_files.assert_called_with( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS + ) + mock_report_upload_complete.assert_called() + mock_report_upload_complete.assert_called_with( + TEST_STAGING_METADATA, + UploadStatus.COMPLETE, + "Patient matched on historical name", + "Y12345", + ) + mock_remove_ingested_file_from_source_bucket.assert_called() + + +def test_handle_sqs_message_lenient_mode_happy_path( + set_env, + mocker, + mock_uuid, + mock_validate_files, + mock_pds_service, + mock_pds_validation_lenient, + mock_pds_validation_strict, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + service = BulkUploadService(strict_mode=False) + mocker.patch.object(service, "dynamo_repository") + mocker.patch.object(service, "sqs_repository") + mocker.patch.object(service, "bulk_upload_s3_repository") + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_report_upload_complete = mocker.patch.object( + service.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + service.bulk_upload_s3_repository, "remove_ingested_file_from_source_bucket" + ) + mocker.patch.object(service.bulk_upload_s3_repository, "check_virus_result") + + service.handle_sqs_message(message=TEST_SQS_MESSAGE) + mock_create_lg_records_and_copy_files.assert_called_with( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS + ) + mock_pds_validation_lenient.assert_called() + mock_pds_validation_strict.assert_not_called() + mock_report_upload_complete.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + + +def test_concatenate_acceptance_reason(repo_under_test): + accepted_reason = None + test_reason = "test_reason_1" + actual_reason = repo_under_test.concatenate_acceptance_reason( + accepted_reason, test_reason + ) + assert actual_reason == test_reason + another_test_reason = "test_reason_2" + another_actual_reason = repo_under_test.concatenate_acceptance_reason( + actual_reason, another_test_reason + ) + assert another_actual_reason == test_reason + ", " + another_test_reason + + +# Handle sqs message tests +def test_handle_sqs_message_happy_path_strict_mode( + set_env, + mocker, + mock_uuid, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation_strict, + mock_ods_validation, + mock_check_virus_result, +): + TEST_STAGING_METADATA.retries = 0 + + mock_check_virus_result_override = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, "check_virus_result" + ) + mock_check_virus_result_override.return_value = VirusScanResult.CLEAN + + mocker.patch.object(BulkUploadService, "create_lg_records_and_copy_files") + mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + + mock_pds_validation_strict.return_value = True + mock_ods_validation.return_value = True + mock_validate_files.return_value = None + + repo_under_test.handle_sqs_message(TEST_SQS_MESSAGE) + + mock_check_virus_result_override.assert_called() + + expected_file_paths = [ + "1of3_Lloyd_George_Record_[Jane Smith]_[9000000009]_[22-10-2010].pdf", + "2of3_Lloyd_George_Record_[Jane Smith]_[9000000009]_[22-10-2010].pdf", + "3of3_Lloyd_George_Record_[Jane Smith]_[9000000009]_[22-10-2010].pdf", + ] + mock_validate_files.assert_called_with( + expected_file_paths, TEST_STAGING_METADATA.nhs_number + ) diff --git a/lambdas/utils/audit_logging_setup.py b/lambdas/utils/audit_logging_setup.py index ce1acd16b..a654bda77 100644 --- a/lambdas/utils/audit_logging_setup.py +++ b/lambdas/utils/audit_logging_setup.py @@ -2,6 +2,7 @@ from utils.logging_formatter import LoggingFormatter + class LoggingService: audit_logger = None diff --git a/lambdas/utils/logging_formatter.py b/lambdas/utils/logging_formatter.py index 75ac2ba0c..c2f8f86dd 100644 --- a/lambdas/utils/logging_formatter.py +++ b/lambdas/utils/logging_formatter.py @@ -3,6 +3,7 @@ from utils.request_context import request_context + class LoggingFormatter(logging.Formatter): """ A JSON formatter that automatically adds key information from the @@ -29,4 +30,4 @@ def format(self, record: logging.LogRecord) -> str: if record.__dict__.get("custom_args", {}) is not None: log_content.update(record.__dict__.get("custom_args", {})) - return json.dumps(log_content) \ No newline at end of file + return json.dumps(log_content) From 55fda20cf1be51f232916961382c15b15c7c5257 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 21 Aug 2025 11:12:44 +0100 Subject: [PATCH 02/12] [PRMT-580]- refactored building staging metadata --- lambdas/services/bulk_upload_service_v2.py | 24 +++--- .../services/test_bulk_upload_service_v2.py | 73 +++++++++++++++++++ 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 36fd0bf1a..2384255dd 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -107,6 +107,16 @@ def process_message_queue(self, records: list): ) logger.info(message_body) + def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: + logger.info("Validating SQS event") + try: + staging_metadata_json = message["body"] + return StagingMetadata.model_validate_json(staging_metadata_json) + except (pydantic.ValidationError, KeyError) as e: + logger.error(f"Got incomprehensible message: {message}") + logger.error(e) + raise InvalidMessageException(str(e)) + # def handle_sqs_message_v2(self, message: dict): # logger.info("validate SQS event") # staging_metadata = self.build_staging_metadata_from_message(message) @@ -147,21 +157,11 @@ def process_message_queue(self, records: list): # self.add_information_to_stitching_queue(staging_metadata, patient_ods_code, accepted_reason) def handle_sqs_message(self, message: dict): - logger.info("Validating SQS event") patient_ods_code = "" accepted_reason = None - try: - staging_metadata_json = message["body"] - staging_metadata = StagingMetadata.model_validate_json( - staging_metadata_json - ) - except (pydantic.ValidationError, KeyError) as e: - logger.error(f"Got incomprehensible message: {message}") - logger.error(e) - raise InvalidMessageException(str(e)) - + logger.info("validate SQS event") + staging_metadata = self.build_staging_metadata_from_message(message) logger.info("SQS event is valid. Validating NHS number and file names") - try: file_names = [ os.path.basename(metadata.file_path) diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index 7353d232f..9d89af3de 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -1115,3 +1115,76 @@ def test_handle_sqs_message_happy_path_strict_mode( mock_validate_files.assert_called_with( expected_file_paths, TEST_STAGING_METADATA.nhs_number ) + + +def test_handle_sqs_message_happy_path_v2(mocker, repo_under_test): + mock_metadata = TEST_STAGING_METADATA + mock_staging_metadata = mocker.patch.object( + repo_under_test, + "build_staging_metadata_from_message", + return_value=mock_metadata, + ) + + mock_validate_entry = mocker.patch.object( + repo_under_test, "validate_entry", return_value=("some reason", "Y12345") + ) + + mock_validate_virus_scan = mocker.patch.object( + repo_under_test, "validate_virus_scan", return_value=True + ) + + mock_initiate_transactions = mocker.patch.object( + repo_under_test, "initiate_transactions" + ) + mock_transfer_files = mocker.patch.object( + repo_under_test, "transfer_files", return_value=True + ) + + mock_remove_files = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "remove_ingested_file_from_source_bucket", + ) + + mock_write_report = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + mock_add_to_stitching_queue = mocker.patch.object( + repo_under_test, "add_information_to_stitching_queue" + ) + + repo_under_test.handle_sqs_message_v2(TEST_SQS_MESSAGE) + + mock_staging_metadata.assert_called_once_with(TEST_SQS_MESSAGE) + mock_validate_entry.assert_called_once_with(mock_metadata) + mock_validate_virus_scan.assert_called_once_with(mock_metadata, "Y12345") + mock_initiate_transactions.assert_called_once() + mock_transfer_files.assert_called_once_with(mock_metadata, "Y12345") + mock_remove_files.assert_called_once() + mock_write_report.assert_called_once_with( + mock_metadata, UploadStatus.COMPLETE, "some reason", "Y12345" + ) + mock_add_to_stitching_queue.assert_called_once_with( + mock_metadata, "Y12345", "some reason" + ) + + +def test_build_staging_metadata_from_message(repo_under_test): + result = repo_under_test.build_staging_metadata_from_message(TEST_SQS_MESSAGE) + assert ( + result.nhs_number + == TEST_SQS_MESSAGE["messageAttributes"]["NhsNumber"]["stringValue"] + ) + assert len(result.files) > 0 + assert result.retries == 0 + + +def test_build_staging_metadata_from_message_with_missing_body(repo_under_test): + with pytest.raises(InvalidMessageException): + repo_under_test.build_staging_metadata_from_message({}) + + +def test_build_staging_metadata_from_message_with_invalid_json(repo_under_test): + bad_message = {"body": '{"invalid_json": }'} + with pytest.raises(InvalidMessageException): + repo_under_test.build_staging_metadata_from_message(bad_message) From 8ab0fdfce92e04db8339d3175f8725d0d4889e07 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 21 Aug 2025 11:27:24 +0100 Subject: [PATCH 03/12] [PRMT-580]- refactored validate entry --- lambdas/services/bulk_upload_service_v2.py | 183 ++++++----- .../services/test_bulk_upload_service_v2.py | 297 ++++++++++++++++++ 2 files changed, 409 insertions(+), 71 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 2384255dd..5d33af125 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -117,6 +117,113 @@ def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: logger.error(e) raise InvalidMessageException(str(e)) + def validate_filenames(self, staging_metadata: StagingMetadata): + file_names = [ + os.path.basename(metadata.file_path) for metadata in staging_metadata.files + ] + request_context.patient_nhs_no = staging_metadata.nhs_number + validate_nhs_number(staging_metadata.nhs_number) + validate_lg_file_names(file_names, staging_metadata.nhs_number) + + def validate_accessing_patient_data( + self, + file_names: list[str], + pds_patient_details, + patient_ods_code: str, + ) -> str | None: + + if self.pds_fhir_always_true: + return None + accepted_reason = None + + if self.strict_mode: + is_name_validation_based_on_historic_name = ( + validate_filename_with_patient_details_strict( + file_names, pds_patient_details + ) + ) + else: + ( + name_validation_accepted_reason, + is_name_validation_based_on_historic_name, + ) = validate_filename_with_patient_details_lenient( + file_names, pds_patient_details + ) + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, name_validation_accepted_reason + ) + + if is_name_validation_based_on_historic_name: + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, "Patient matched on historical name" + ) + if not allowed_to_ingest_ods_code(patient_ods_code): + raise LGInvalidFilesException("Patient not registered at your practice") + patient_death_notification_status = ( + pds_patient_details.get_death_notification_status() + ) + if patient_death_notification_status: + deceased_accepted_reason = ( + f"Patient is deceased - {patient_death_notification_status.name}" + ) + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, deceased_accepted_reason + ) + if patient_ods_code is PatientOdsInactiveStatus.RESTRICTED: + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, "PDS record is restricted" + ) + + return accepted_reason + + def validate_entry( + self, staging_metadata: StagingMetadata + ) -> tuple[str | None, str | None]: + patient_ods_code = "" + try: + self.validate_filenames(staging_metadata) + file_names = [ + os.path.basename(metadata.file_path) + for metadata in staging_metadata.files + ] + + # Fetch PDS details and ODS code early + pds_patient_details = getting_patient_info_from_pds( + staging_metadata.nhs_number + ) + patient_ods_code = ( + pds_patient_details.get_ods_code_or_inactive_status_for_gp() + ) + + accepted_reason = self.validate_accessing_patient_data( + file_names, + pds_patient_details, + patient_ods_code, + ) + + return accepted_reason, patient_ods_code + + except ( + InvalidNhsNumberException, + LGInvalidFilesException, + PatientRecordAlreadyExistException, + ) as error: + logger.info( + f"Detected issue related to patient number: {staging_metadata.nhs_number}" + ) + logger.error(error) + logger.info("Will stop processing Lloyd George record for this patient.") + + reason = str(error) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + reason, + patient_ods_code if "patient_ods_code" in locals() else None, + ) + + return None, None + # def handle_sqs_message_v2(self, message: dict): # logger.info("validate SQS event") # staging_metadata = self.build_staging_metadata_from_message(message) @@ -157,83 +264,17 @@ def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: # self.add_information_to_stitching_queue(staging_metadata, patient_ods_code, accepted_reason) def handle_sqs_message(self, message: dict): - patient_ods_code = "" - accepted_reason = None logger.info("validate SQS event") staging_metadata = self.build_staging_metadata_from_message(message) logger.info("SQS event is valid. Validating NHS number and file names") - try: - file_names = [ - os.path.basename(metadata.file_path) - for metadata in staging_metadata.files - ] - request_context.patient_nhs_no = staging_metadata.nhs_number - validate_nhs_number(staging_metadata.nhs_number) - validate_lg_file_names(file_names, staging_metadata.nhs_number) - pds_patient_details = getting_patient_info_from_pds( - staging_metadata.nhs_number - ) - patient_ods_code = ( - pds_patient_details.get_ods_code_or_inactive_status_for_gp() - ) - if not self.pds_fhir_always_true: - if not self.strict_mode: - ( - name_validation_accepted_reason, - is_name_validation_based_on_historic_name, - ) = validate_filename_with_patient_details_lenient( - file_names, pds_patient_details - ) - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, name_validation_accepted_reason - ) - else: - is_name_validation_based_on_historic_name = ( - validate_filename_with_patient_details_strict( - file_names, pds_patient_details - ) - ) - if is_name_validation_based_on_historic_name: - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, "Patient matched on historical name" - ) - - if not allowed_to_ingest_ods_code(patient_ods_code): - raise LGInvalidFilesException( - "Patient not registered at your practice" - ) - patient_death_notification_status = ( - pds_patient_details.get_death_notification_status() - ) - if patient_death_notification_status: - deceased_accepted_reason = f"Patient is deceased - {patient_death_notification_status.name}" - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, deceased_accepted_reason - ) - if patient_ods_code is PatientOdsInactiveStatus.RESTRICTED: - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, "PDS record is restricted" - ) - - except ( - InvalidNhsNumberException, - LGInvalidFilesException, - PatientRecordAlreadyExistException, - ) as error: - logger.info( - f"Detected issue related to patient number: {staging_metadata.nhs_number}" - ) - logger.error(error) - logger.info("Will stop processing Lloyd George record for this patient.") - - reason = str(error) - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, UploadStatus.FAILED, reason, patient_ods_code - ) + accepted_reason, patient_ods_code = self.validate_entry(staging_metadata) + if accepted_reason is None: return logger.info( - "NHS Number and filename validation complete. Checking virus scan has marked files as Clean" + "NHS Number and filename validation complete." + "Validated strick mode, and if we can access the patient information ex:patient dead" + " Checking virus scan has marked files as Clean" ) try: diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index 9d89af3de..ad77a2ed5 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -1,4 +1,5 @@ import json +import os from copy import copy import pytest @@ -46,6 +47,7 @@ BulkUploadException, DocumentInfectedException, InvalidMessageException, + InvalidNhsNumberException, PatientRecordAlreadyExistException, PdsTooManyRequestsException, S3FileNotFoundException, @@ -53,6 +55,8 @@ ) from utils.lloyd_george_validator import LGInvalidFilesException +from lambdas.models.staging_metadata import MetadataFile, StagingMetadata + @pytest.fixture def repo_under_test(set_env, mocker): @@ -1188,3 +1192,296 @@ def test_build_staging_metadata_from_message_with_invalid_json(repo_under_test): bad_message = {"body": '{"invalid_json": }'} with pytest.raises(InvalidMessageException): repo_under_test.build_staging_metadata_from_message(bad_message) + + +def test_validate_filenames(repo_under_test, mocker): + test_file_path = "/9730787212/1of20_Lloyd_George_Record_[Brad Edmond Avery]_[9730787212]_[13-09-2006].pdf" + test_nhs_number = "9730787212" + + metadata_file_data = { + "FILEPATH": test_file_path, + "PAGE COUNT": "20", + "NHS-NO": test_nhs_number, + "GP-PRACTICE-CODE": "Y12345", + "SECTION": "SectionA", + "SUB-SECTION": None, + "SCAN-DATE": "13-09-2006", + "SCAN-ID": "SCAN123", + "USER-ID": "USER456", + "UPLOAD": "UPLOAD789", + } + metadata_file = MetadataFile.parse_obj(metadata_file_data) + + staging_metadata_data = { + "nhs_number": test_nhs_number, + "files": [metadata_file], + "retries": 0, + } + staging_metadata = StagingMetadata.parse_obj(staging_metadata_data) + + mock_validate_nhs = mocker.patch( + "services.bulk_upload_service_v2.validate_nhs_number" + ) + mock_validate_lg = mocker.patch( + "services.bulk_upload_service_v2.validate_lg_file_names" + ) + + repo_under_test.validate_filenames(staging_metadata) + + mock_validate_nhs.assert_called_once_with(test_nhs_number) + mock_validate_lg.assert_called_once_with( + [os.path.basename(test_file_path)], + test_nhs_number, + ) + + +@pytest.fixture +def mock_patient(mocker): + patient = mocker.Mock() + patient.get_death_notification_status.return_value = None + return patient + + +def test_validate_entry_happy_path(mocker, repo_under_test, mock_patient): + staging_metadata = TEST_STAGING_METADATA + + mock_validate_filenames = mocker.patch.object(repo_under_test, "validate_filenames") + mock_getting_patient_info_from_pds = mocker.patch( + "services.bulk_upload_service_v2.getting_patient_info_from_pds" + ) + mock_patient = mocker.Mock() + mock_patient.get_ods_code_or_inactive_status_for_gp.return_value = "Y12345" + mock_getting_patient_info_from_pds.return_value = mock_patient + + mock_validate_accessing_patient_data = mocker.patch.object( + repo_under_test, "validate_accessing_patient_data", return_value="some reason" + ) + + accepted_reason, patient_ods_code = repo_under_test.validate_entry(staging_metadata) + + mock_validate_filenames.assert_called_once_with(staging_metadata) + mock_getting_patient_info_from_pds.assert_called_once_with( + staging_metadata.nhs_number + ) + mock_validate_accessing_patient_data.assert_called_once_with( + [os.path.basename(f.file_path) for f in staging_metadata.files], + mock_patient, + "Y12345", + ) + + assert accepted_reason == "some reason" + assert patient_ods_code == "Y12345" + + +def test_validate_entry_invalid_file_exception_triggers_write_to_dynamo( + mocker, repo_under_test +): + staging_metadata = TEST_STAGING_METADATA + + mocker.patch.object( + repo_under_test, + "validate_filenames", + side_effect=LGInvalidFilesException("invalid file"), + ) + mock_write_report = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + accepted_reason, patient_ods_code = repo_under_test.validate_entry(staging_metadata) + + mock_write_report.assert_called_once() + args, kwargs = mock_write_report.call_args + assert args[1] == UploadStatus.FAILED + assert "invalid file" in args[2] + + assert accepted_reason is None + assert patient_ods_code is None + + +def test_validate_entry_patient_record_exists_exception(mocker, repo_under_test): + staging_metadata = TEST_STAGING_METADATA + + mocker.patch.object( + repo_under_test, + "validate_filenames", + side_effect=PatientRecordAlreadyExistException("record exists"), + ) + mock_write_report = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + accepted_reason, patient_ods_code = repo_under_test.validate_entry(staging_metadata) + + mock_write_report.assert_called_once() + args, kwargs = mock_write_report.call_args + assert args[1] == UploadStatus.FAILED + assert "record exists" in args[2] + + assert accepted_reason is None + assert patient_ods_code is None + + +def test_validate_entry_invalid_nhs_number_exception(mocker, repo_under_test): + staging_metadata = TEST_STAGING_METADATA + + mocker.patch.object( + repo_under_test, + "validate_filenames", + side_effect=InvalidNhsNumberException("bad nhs"), + ) + mock_write_report = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + accepted_reason, patient_ods_code = repo_under_test.validate_entry(staging_metadata) + + mock_write_report.assert_called_once() + args, kwargs = mock_write_report.call_args + assert "bad nhs" in args[2] + + assert accepted_reason is None + assert patient_ods_code is None + + +def test_validate_accessing_patient_data_returns_none_when_pds_fhir_always_true( + repo_under_test, mock_patient +): + repo_under_test.pds_fhir_always_true = True + + result = repo_under_test.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + assert result is None + + +def test_validate_accessing_patient_data_strict_mode_calls_strict_validation( + mocker, repo_under_test, mock_patient +): + mock_validate = mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_strict", + return_value=False, + ) + mock_allowed = mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", + return_value=True, + ) + + result = repo_under_test.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + mock_validate.assert_called_once() + mock_allowed.assert_called_once() + assert result is None + + +@pytest.fixture +def lenient_repo(set_env, mocker): # 👈 include set_env + service = BulkUploadService(strict_mode=False) + mocker.patch.object(service, "dynamo_repository") + mocker.patch.object(service, "sqs_repository") + mocker.patch.object(service, "bulk_upload_s3_repository") + return service + + +def test_validate_accessing_patient_data_lenient_mode_calls_lenient_validation( + mocker, lenient_repo, mock_patient +): + mock_validate = mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("some reason", False), + ) + mock_allowed = mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", + return_value=True, + ) + + result = lenient_repo.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + mock_validate.assert_called_once() + mock_allowed.assert_called_once() + assert "some reason" in result + + +def test_validate_accessing_patient_data_adds_historic_name_reason_when_flag_true( + mocker, lenient_repo, mock_patient +): + mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("some reason", True), + ) + mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True + ) + + result = lenient_repo.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + assert "some reason" in result + assert "Patient matched on historical name" in result + + +def test_validate_accessing_patient_data_raises_exception_when_ods_code_not_allowed( + mocker, lenient_repo, mock_patient +): + mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("some reason", False), + ) + mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", + return_value=False, + ) + + with pytest.raises( + LGInvalidFilesException, match="Patient not registered at your practice" + ): + lenient_repo.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + +def test_validate_accessing_patient_data_adds_deceased_reason( + mocker, lenient_repo, mock_patient +): + mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("some reason", False), + ) + mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True + ) + + deceased_status_mock = mocker.Mock() + deceased_status_mock.name = "Formal" + mock_patient.get_death_notification_status.return_value = deceased_status_mock + + result = lenient_repo.validate_accessing_patient_data( + ["file.pdf"], mock_patient, "A1234" + ) + + assert "some reason" in result + assert "Patient is deceased - Formal" in result + + +def test_validate_accessing_patient_data_adds_restricted_reason( + mocker, lenient_repo, mock_patient +): + mocker.patch( + "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return_value=("some reason", False), + ) + mocker.patch( + "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True + ) + + result = lenient_repo.validate_accessing_patient_data( + ["file.pdf"], mock_patient, PatientOdsInactiveStatus.RESTRICTED + ) + + assert "some reason" in result + assert "PDS record is restricted" in result From 1017cf3bf7fe095e54d4de346a90e9f1b9a4ff2f Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 21 Aug 2025 13:28:39 +0100 Subject: [PATCH 04/12] [PRMT-580]- refactored virus scan --- lambdas/services/bulk_upload_service_v2.py | 99 +++++++-------- .../services/test_bulk_upload_service_v2.py | 113 ++++++++++++++++++ 2 files changed, 163 insertions(+), 49 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 5d33af125..4822d54e8 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -224,6 +224,55 @@ def validate_entry( return None, None + def validate_virus_scan( + self, staging_metadata: StagingMetadata, patient_ods_code: str + ) -> bool: + try: + self.resolve_source_file_path(staging_metadata) + self.bulk_upload_s3_repository.check_virus_result( + staging_metadata, self.file_path_cache + ) + return True + except VirusScanNoResultException as e: + logger.info(e) + logger.info( + f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue" + ) + if staging_metadata.retries > 14: + err = ( + "File was not scanned for viruses before maximum retries attempted" + ) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, UploadStatus.FAILED, err, patient_ods_code + ) + else: + self.sqs_repository.put_staging_metadata_back_to_queue(staging_metadata) + return False + except (VirusScanFailedException, DocumentInfectedException) as e: + logger.info(e) + logger.info( + f"Virus scan results check failed for: {staging_metadata.nhs_number}, removing from queue" + ) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "One or more of the files failed virus scanner check", + patient_ods_code, + ) + return False + except S3FileNotFoundException as e: + logger.info(e) + logger.info( + f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}" + ) + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "One or more of the files is not accessible from staging bucket", + patient_ods_code, + ) + return False + # def handle_sqs_message_v2(self, message: dict): # logger.info("validate SQS event") # staging_metadata = self.build_staging_metadata_from_message(message) @@ -276,56 +325,8 @@ def handle_sqs_message(self, message: dict): "Validated strick mode, and if we can access the patient information ex:patient dead" " Checking virus scan has marked files as Clean" ) - - try: - self.resolve_source_file_path(staging_metadata) - self.bulk_upload_s3_repository.check_virus_result( - staging_metadata, self.file_path_cache - ) - except VirusScanNoResultException as e: - logger.info(e) - logger.info( - f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue" - ) - if staging_metadata.retries > 14: - err = ( - "File was not scanned for viruses before maximum retries attempted" - ) - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, UploadStatus.FAILED, err, patient_ods_code - ) - else: - self.sqs_repository.put_staging_metadata_back_to_queue(staging_metadata) - return - except (VirusScanFailedException, DocumentInfectedException) as e: - logger.info(e) - logger.info( - f"Virus scan results check failed for: {staging_metadata.nhs_number}, removing from queue" - ) - logger.info("Will stop processing Lloyd George record for this patient") - - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.FAILED, - "One or more of the files failed virus scanner check", - patient_ods_code, - ) - return - except S3FileNotFoundException as e: - logger.info(e) - logger.info( - f"One or more of the files is not accessible from S3 bucket for patient {staging_metadata.nhs_number}" - ) - logger.info("Will stop processing Lloyd George record for this patient") - - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.FAILED, - "One or more of the files is not accessible from staging bucket", - patient_ods_code, - ) + if not self.validate_virus_scan(staging_metadata, patient_ods_code): return - logger.info("Virus result validation complete. Initialising transaction") self.bulk_upload_s3_repository.init_transaction() diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index ad77a2ed5..47e554dfe 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -51,6 +51,7 @@ PatientRecordAlreadyExistException, PdsTooManyRequestsException, S3FileNotFoundException, + VirusScanFailedException, VirusScanNoResultException, ) from utils.lloyd_george_validator import LGInvalidFilesException @@ -1485,3 +1486,115 @@ def test_validate_accessing_patient_data_adds_restricted_reason( assert "some reason" in result assert "PDS record is restricted" in result + + +def test_virus_scan_success(repo_under_test, mocker): + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mock_check = mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, "check_virus_result" + ) + + result = repo_under_test.validate_virus_scan(TEST_STAGING_METADATA, "ODS123") + + assert result is True + mock_check.assert_called_once_with( + TEST_STAGING_METADATA, repo_under_test.file_path_cache + ) + + +def test_virus_scan_no_result_max_retries(repo_under_test, mocker): + metadata = copy(TEST_STAGING_METADATA) + metadata.retries = 15 + + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "check_virus_result", + side_effect=VirusScanNoResultException("no result"), + ) + + result = repo_under_test.validate_virus_scan(metadata, "ODS123") + + assert result is False + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_once_with( + metadata, + UploadStatus.FAILED, + "File was not scanned for viruses before maximum retries attempted", + "ODS123", + ) + + +def test_virus_scan_no_result_retries_remaining(repo_under_test, mocker): + metadata = copy(TEST_STAGING_METADATA) + metadata.retries = 5 + + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "check_virus_result", + side_effect=VirusScanNoResultException("no result"), + ) + + result = repo_under_test.validate_virus_scan(metadata, "ODS123") + + assert result is False + repo_under_test.sqs_repository.put_staging_metadata_back_to_queue.assert_called_once_with( + metadata + ) + + +def test_virus_scan_failed_exception(repo_under_test, mocker): + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "check_virus_result", + side_effect=VirusScanFailedException("fail"), + ) + + result = repo_under_test.validate_virus_scan(TEST_STAGING_METADATA, "ODS123") + + assert result is False + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_once_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "One or more of the files failed virus scanner check", + "ODS123", + ) + + +def test_virus_scan_document_infected_exception(repo_under_test, mocker): + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "check_virus_result", + side_effect=DocumentInfectedException("infected"), + ) + + result = repo_under_test.validate_virus_scan(TEST_STAGING_METADATA, "ODS123") + + assert result is False + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_once_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "One or more of the files failed virus scanner check", + "ODS123", + ) + + +def test_virus_scan_file_not_found(repo_under_test, mocker): + mocker.patch.object(repo_under_test, "resolve_source_file_path") + mocker.patch.object( + repo_under_test.bulk_upload_s3_repository, + "check_virus_result", + side_effect=S3FileNotFoundException("missing"), + ) + + result = repo_under_test.validate_virus_scan(TEST_STAGING_METADATA, "ODS123") + + assert result is False + repo_under_test.dynamo_repository.write_report_upload_to_dynamo.assert_called_once_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "One or more of the files is not accessible from staging bucket", + "ODS123", + ) From a7374875fe22481216409dcc20cd39f43edee456 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 21 Aug 2025 13:42:58 +0100 Subject: [PATCH 05/12] [PRMT-580]- refactored initiate transaction --- lambdas/services/bulk_upload_service_v2.py | 12 ++++++------ .../unit/services/test_bulk_upload_service_v2.py | 7 +++++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 4822d54e8..75a481e92 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -273,6 +273,11 @@ def validate_virus_scan( ) return False + def initiate_transactions(self): + self.bulk_upload_s3_repository.init_transaction() + self.dynamo_repository.init_transaction() + logger.info("Transaction initialised.") + # def handle_sqs_message_v2(self, message: dict): # logger.info("validate SQS event") # staging_metadata = self.build_staging_metadata_from_message(message) @@ -329,12 +334,7 @@ def handle_sqs_message(self, message: dict): return logger.info("Virus result validation complete. Initialising transaction") - self.bulk_upload_s3_repository.init_transaction() - self.dynamo_repository.init_transaction() - - logger.info( - "Transaction initialised. Transferring files to main S3 bucket and creating metadata" - ) + self.initiate_transactions() try: self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index 47e554dfe..8383a0d54 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -1598,3 +1598,10 @@ def test_virus_scan_file_not_found(repo_under_test, mocker): "One or more of the files is not accessible from staging bucket", "ODS123", ) + + +def test_initiate_transactions_calls_repos(repo_under_test): + repo_under_test.initiate_transactions() + + repo_under_test.bulk_upload_s3_repository.init_transaction.assert_called_once() + repo_under_test.dynamo_repository.init_transaction.assert_called_once() From 78a2f25669570f5e2a25fce357d4e41c79ffab6d Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Fri, 22 Aug 2025 09:19:25 +0100 Subject: [PATCH 06/12] [PRMT-580]- refactored transfer files --- lambdas/services/bulk_upload_service_v2.py | 48 +++++++++++-------- .../services/test_bulk_upload_service_v2.py | 36 ++++++++++++++ 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 75a481e92..7feb3b181 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -278,6 +278,30 @@ def initiate_transactions(self): self.dynamo_repository.init_transaction() logger.info("Transaction initialised.") + def transfer_files(self, staging_metadata, patient_ods_code) -> bool: + try: + self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) + logger.info( + f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", + {"Result": "Successful upload"}, + ) + return True + except ClientError as e: + logger.info( + f"Got unexpected error during file transfer: {str(e)}", + {"Result": "Unsuccessful upload"}, + ) + logger.info("Will try to rollback any change to database and bucket") + self.rollback_transaction() + + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + patient_ods_code, + ) + return False + # def handle_sqs_message_v2(self, message: dict): # logger.info("validate SQS event") # staging_metadata = self.build_staging_metadata_from_message(message) @@ -336,31 +360,13 @@ def handle_sqs_message(self, message: dict): self.initiate_transactions() - try: - self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) - logger.info( - f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", - {"Result": "Successful upload"}, - ) - except ClientError as e: - logger.info( - f"Got unexpected error during file transfer: {str(e)}", - {"Result": "Unsuccessful upload"}, - ) - logger.info("Will try to rollback any change to database and bucket") - self.rollback_transaction() - - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.FAILED, - "Validation passed but error occurred during file transfer", - patient_ods_code, - ) + logger.info("Transferring files and creating metadata") + if not self.transfer_files(staging_metadata, patient_ods_code): return - logger.info( "File transfer complete. Removing uploaded files from staging bucket" ) + self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() logger.info( diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index 8383a0d54..a9d4799e2 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -1605,3 +1605,39 @@ def test_initiate_transactions_calls_repos(repo_under_test): repo_under_test.bulk_upload_s3_repository.init_transaction.assert_called_once() repo_under_test.dynamo_repository.init_transaction.assert_called_once() + + +def test_transfer_files_success(repo_under_test, mocker): + mock_create = mocker.patch.object( + repo_under_test, "create_lg_records_and_copy_files" + ) + + result = repo_under_test.transfer_files(TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS) + + assert result is True + mock_create.assert_called_once_with(TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS) + + +def test_transfer_files_client_error_triggers_rollback(repo_under_test, mocker): + mocker.patch.object( + repo_under_test, + "create_lg_records_and_copy_files", + side_effect=ClientError( + {"Error": {"Code": "500", "Message": "Something failed"}}, "CopyObject" + ), + ) + mock_rollback = mocker.patch.object(repo_under_test, "rollback_transaction") + mock_write_report = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + + result = repo_under_test.transfer_files(TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS) + + assert result is False + mock_rollback.assert_called_once() + mock_write_report.assert_called_once_with( + TEST_STAGING_METADATA, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + TEST_CURRENT_GP_ODS, + ) From 60ed752bbba1619cc413ef3cf5991759d55b4b37 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Fri, 22 Aug 2025 15:59:05 +0100 Subject: [PATCH 07/12] [PRMT-580]- refactored add information to stiching queue and updated tests --- lambdas/services/bulk_upload_service_v2.py | 59 ++---- .../services/test_bulk_upload_service_v2.py | 184 +++++++++++------- 2 files changed, 123 insertions(+), 120 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 7feb3b181..06e9a8a63 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -302,44 +302,17 @@ def transfer_files(self, staging_metadata, patient_ods_code) -> bool: ) return False - # def handle_sqs_message_v2(self, message: dict): - # logger.info("validate SQS event") - # staging_metadata = self.build_staging_metadata_from_message(message) - # logger.info("SQS event is valid. Validating NHS number and file names") - # - # accepted_reason, patient_ods_code = self.validate_entry(staging_metadata) - # if accepted_reason is None: - # return - # - # logger.info( - # "NHS Number and filename validation complete." - # "Validated strick mode, and if we can access the patient information ex:patient dead" - # " Checking virus scan has marked files as Clean" - # ) - # - # if not self.validate_virus_scan(staging_metadata, patient_ods_code): - # return - # logger.info("Virus result validation complete. Initialising transaction") - # - # self.initiate_transactions() - # logger.info("Transferring files and creating metadata") - # if not self.transfer_files(staging_metadata, patient_ods_code): - # return - # logger.info("File transfer complete. Removing uploaded files from staging bucket") - # self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() - # - # logger.info( - # f"Completed file ingestion for patient {staging_metadata.nhs_number}", - # {"Result": "Successful upload"}, - # ) - # logger.info("Reporting transaction successful") - # self.dynamo_repository.write_report_upload_to_dynamo( - # staging_metadata, - # UploadStatus.COMPLETE, - # accepted_reason, - # patient_ods_code, - # ) - # self.add_information_to_stitching_queue(staging_metadata, patient_ods_code, accepted_reason) + def add_information_to_stitching_queue( + self, staging_metadata, patient_ods_code, accepted_reason + ): + pdf_stitching_sqs_message = PdfStitchingSqsMessage( + nhs_number=staging_metadata.nhs_number, + snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value, + ) + self.sqs_repository.send_message_to_pdf_stitching_queue( + queue_url=self.pdf_stitching_queue_url, + message=pdf_stitching_sqs_message, + ) def handle_sqs_message(self, message: dict): logger.info("validate SQS event") @@ -381,14 +354,10 @@ def handle_sqs_message(self, message: dict): patient_ods_code, ) - pdf_stitching_sqs_message = PdfStitchingSqsMessage( - nhs_number=staging_metadata.nhs_number, - snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value, - ) - self.sqs_repository.send_message_to_pdf_stitching_queue( - queue_url=self.pdf_stitching_queue_url, - message=pdf_stitching_sqs_message, + self.add_information_to_stitching_queue( + staging_metadata, patient_ods_code, accepted_reason ) + logger.info( f"Message sent to stitching queue for patient {staging_metadata.nhs_number}" ) diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index a9d4799e2..7f3f23b0f 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -3,6 +3,7 @@ from copy import copy import pytest +import services.bulk_upload_service_v2 as bulk_upload_module from botocore.exceptions import ClientError from enums.patient_ods_inactive_status import PatientOdsInactiveStatus from enums.upload_status import UploadStatus @@ -75,14 +76,15 @@ def mock_check_virus_result(mocker): @pytest.fixture def mock_validate_files(mocker): - yield mocker.patch("services.bulk_upload_service_v2.validate_lg_file_names") + return mocker.patch.object(bulk_upload_module, "validate_lg_file_names") @pytest.fixture def mock_pds_service(mocker): patient = Patient.model_validate(PDS_PATIENT) - mocker.patch( - "services.bulk_upload_service_v2.getting_patient_info_from_pds", + mocker.patch.object( + bulk_upload_module, + "getting_patient_info_from_pds", return_value=patient, ) yield patient @@ -91,8 +93,9 @@ def mock_pds_service(mocker): @pytest.fixture def mock_pds_service_patient_deceased_formal(mocker): patient = Patient.model_validate(PDS_PATIENT_DECEASED_FORMAL) - mocker.patch( - "services.bulk_upload_service_v2.getting_patient_info_from_pds", + mocker.patch.object( + bulk_upload_module, + "getting_patient_info_from_pds", return_value=patient, ) yield patient @@ -101,8 +104,9 @@ def mock_pds_service_patient_deceased_formal(mocker): @pytest.fixture def mock_pds_service_patient_deceased_informal(mocker): patient = Patient.model_validate(PDS_PATIENT_DECEASED_INFORMAL) - mocker.patch( - "services.bulk_upload_service_v2.getting_patient_info_from_pds", + mocker.patch.object( + bulk_upload_module, + "getting_patient_info_from_pds", return_value=patient, ) yield patient @@ -111,31 +115,44 @@ def mock_pds_service_patient_deceased_informal(mocker): @pytest.fixture def mock_pds_service_patient_restricted(mocker): patient = Patient.model_validate(PDS_PATIENT_RESTRICTED) - mocker.patch( - "services.bulk_upload_service_v2.getting_patient_info_from_pds", + mocker.patch.object( + bulk_upload_module, + "getting_patient_info_from_pds", return_value=patient, ) - yield patient + return patient @pytest.fixture def mock_pds_validation_lenient(mocker): - yield mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + return mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_lenient", return_value=("test string", True), ) @pytest.fixture def mock_pds_validation_strict(mocker): - yield mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_strict", + return mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_strict", ) @pytest.fixture def mock_ods_validation(mocker): - yield mocker.patch("services.bulk_upload_service_v2.allowed_to_ingest_ods_code") + return mocker.patch.object(bulk_upload_module, "allowed_to_ingest_ods_code") + + +@pytest.fixture +def mock_ods_validation_true(mocker): + patcher = mocker.patch.object( + bulk_upload_module, + "allowed_to_ingest_ods_code", + return_value=True, + ) + yield patcher @pytest.fixture @@ -1158,7 +1175,7 @@ def test_handle_sqs_message_happy_path_v2(mocker, repo_under_test): repo_under_test, "add_information_to_stitching_queue" ) - repo_under_test.handle_sqs_message_v2(TEST_SQS_MESSAGE) + repo_under_test.handle_sqs_message(TEST_SQS_MESSAGE) mock_staging_metadata.assert_called_once_with(TEST_SQS_MESSAGE) mock_validate_entry.assert_called_once_with(mock_metadata) @@ -1220,12 +1237,9 @@ def test_validate_filenames(repo_under_test, mocker): } staging_metadata = StagingMetadata.parse_obj(staging_metadata_data) - mock_validate_nhs = mocker.patch( - "services.bulk_upload_service_v2.validate_nhs_number" - ) - mock_validate_lg = mocker.patch( - "services.bulk_upload_service_v2.validate_lg_file_names" - ) + mock_validate_nhs = mocker.patch.object(bulk_upload_module, "validate_nhs_number") + + mock_validate_lg = mocker.patch.object(bulk_upload_module, "validate_lg_file_names") repo_under_test.validate_filenames(staging_metadata) @@ -1247,8 +1261,8 @@ def test_validate_entry_happy_path(mocker, repo_under_test, mock_patient): staging_metadata = TEST_STAGING_METADATA mock_validate_filenames = mocker.patch.object(repo_under_test, "validate_filenames") - mock_getting_patient_info_from_pds = mocker.patch( - "services.bulk_upload_service_v2.getting_patient_info_from_pds" + mock_getting_patient_info_from_pds = mocker.patch.object( + bulk_upload_module, "getting_patient_info_from_pds" ) mock_patient = mocker.Mock() mock_patient.get_ods_code_or_inactive_status_for_gp.return_value = "Y12345" @@ -1357,23 +1371,20 @@ def test_validate_accessing_patient_data_returns_none_when_pds_fhir_always_true( def test_validate_accessing_patient_data_strict_mode_calls_strict_validation( - mocker, repo_under_test, mock_patient + mocker, repo_under_test, mock_patient, mock_ods_validation_true ): - mock_validate = mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_strict", + mock_validate = mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_strict", return_value=False, ) - mock_allowed = mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", - return_value=True, - ) result = repo_under_test.validate_accessing_patient_data( ["file.pdf"], mock_patient, "A1234" ) mock_validate.assert_called_once() - mock_allowed.assert_called_once() + mock_ods_validation_true.assert_called_once() assert result is None @@ -1386,37 +1397,37 @@ def lenient_repo(set_env, mocker): # 👈 include set_env return service -def test_validate_accessing_patient_data_lenient_mode_calls_lenient_validation( - mocker, lenient_repo, mock_patient -): - mock_validate = mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", +@pytest.fixture +def mock_validate_lenient(mocker): + patcher = mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_lenient", return_value=("some reason", False), ) - mock_allowed = mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", - return_value=True, - ) + yield patcher + + +def test_validate_accessing_patient_data_lenient_mode_calls_lenient_validation( + lenient_repo, mock_patient, mock_validate_lenient, mock_ods_validation_true +): result = lenient_repo.validate_accessing_patient_data( ["file.pdf"], mock_patient, "A1234" ) - mock_validate.assert_called_once() - mock_allowed.assert_called_once() + mock_validate_lenient.assert_called_once() + mock_ods_validation_true.assert_called_once() assert "some reason" in result def test_validate_accessing_patient_data_adds_historic_name_reason_when_flag_true( - mocker, lenient_repo, mock_patient + mocker, lenient_repo, mock_patient, mock_ods_validation_true ): - mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", + mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_lenient", return_value=("some reason", True), ) - mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True - ) result = lenient_repo.validate_accessing_patient_data( ["file.pdf"], mock_patient, "A1234" @@ -1427,17 +1438,9 @@ def test_validate_accessing_patient_data_adds_historic_name_reason_when_flag_tru def test_validate_accessing_patient_data_raises_exception_when_ods_code_not_allowed( - mocker, lenient_repo, mock_patient + lenient_repo, mock_patient, mock_validate_lenient, mock_ods_validation ): - mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", - return_value=("some reason", False), - ) - mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", - return_value=False, - ) - + mock_ods_validation.return_value = False with pytest.raises( LGInvalidFilesException, match="Patient not registered at your practice" ): @@ -1447,15 +1450,8 @@ def test_validate_accessing_patient_data_raises_exception_when_ods_code_not_allo def test_validate_accessing_patient_data_adds_deceased_reason( - mocker, lenient_repo, mock_patient + mocker, lenient_repo, mock_patient, mock_validate_lenient, mock_ods_validation_true ): - mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", - return_value=("some reason", False), - ) - mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True - ) deceased_status_mock = mocker.Mock() deceased_status_mock.name = "Formal" @@ -1470,16 +1466,13 @@ def test_validate_accessing_patient_data_adds_deceased_reason( def test_validate_accessing_patient_data_adds_restricted_reason( - mocker, lenient_repo, mock_patient + mocker, lenient_repo, mock_patient, mock_validate_lenient ): - mocker.patch( - "services.bulk_upload_service_v2.validate_filename_with_patient_details_lenient", - return_value=("some reason", False), - ) - mocker.patch( - "services.bulk_upload_service_v2.allowed_to_ingest_ods_code", return_value=True + mocker.patch.object( + bulk_upload_module, + "allowed_to_ingest_ods_code", + return_value=True, ) - result = lenient_repo.validate_accessing_patient_data( ["file.pdf"], mock_patient, PatientOdsInactiveStatus.RESTRICTED ) @@ -1641,3 +1634,44 @@ def test_transfer_files_client_error_triggers_rollback(repo_under_test, mocker): "Validation passed but error occurred during file transfer", TEST_CURRENT_GP_ODS, ) + + +def test_add_information_to_stitching_queue(repo_under_test, mocker): + mock_send = mocker.patch.object( + repo_under_test.sqs_repository, "send_message_to_pdf_stitching_queue" + ) + + repo_under_test.add_information_to_stitching_queue( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS, accepted_reason="Some reason" + ) + + mock_send.assert_called_once() + args, kwargs = mock_send.call_args + + assert kwargs["queue_url"] == repo_under_test.pdf_stitching_queue_url + + message = kwargs["message"] + assert message.nhs_number == TEST_STAGING_METADATA.nhs_number + assert message.snomed_code_doc_type.code == "16521000000101" + assert message.snomed_code_doc_type.display_name == "Lloyd George record folder" + + +def test_add_information_to_stitching_queue_calls_send_with_correct_values( + repo_under_test, mocker +): + mock_send = mocker.patch.object( + repo_under_test.sqs_repository, "send_message_to_pdf_stitching_queue" + ) + + repo_under_test.add_information_to_stitching_queue( + TEST_STAGING_METADATA, TEST_CURRENT_GP_ODS, accepted_reason="Some reason" + ) + + mock_send.assert_called_once() + args, kwargs = mock_send.call_args + + assert kwargs["queue_url"] == repo_under_test.pdf_stitching_queue_url + + message = kwargs["message"] + assert message.nhs_number == TEST_STAGING_METADATA.nhs_number + assert message.snomed_code_doc_type.code == "16521000000101" From 336b9ccd26d23f0078cdc3f8e1edbf2f1838ff9d Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Wed, 27 Aug 2025 16:51:06 +0100 Subject: [PATCH 08/12] [PRMT-580]- refactored process_message_queue and addressed comments --- lambdas/services/bulk_upload_service_v2.py | 401 +++++++++--------- .../services/test_bulk_upload_service_v2.py | 342 ++++++++++++++- 2 files changed, 536 insertions(+), 207 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 06e9a8a63..91ba2d05a 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -50,62 +50,86 @@ class BulkUploadService: - def __init__(self, strict_mode, pds_fhir_always_true=False): + def __init__(self, strict_mode, bypass_pds=False): self.dynamo_repository = BulkUploadDynamoRepository() self.sqs_repository = BulkUploadSqsRepository() self.bulk_upload_s3_repository = BulkUploadS3Repository() self.strict_mode = strict_mode - self.pdf_content_type = "application/pdf" self.unhandled_messages = [] self.file_path_cache = {} self.pdf_stitching_queue_url = os.environ["PDF_STITCHING_SQS_URL"] - self.pds_fhir_always_true = pds_fhir_always_true + self.bypass_pds = bypass_pds def process_message_queue(self, records: list): for index, message in enumerate(records, start=1): + logger.info(f"Processing message {index} of {len(records)}") + try: - logger.info(f"Processing message {index} of {len(records)}") self.handle_sqs_message(message) - except (PdsTooManyRequestsException, PdsErrorException) as error: - logger.error(error) - - logger.info( - "Cannot validate patient due to PDS responded with Too Many Requests" - ) - logger.info("Cannot process for now due to PDS rate limit reached.") - logger.info( - "All remaining messages in this batch will be returned to sqs queue to retry later." - ) - all_unprocessed_message = records[index - 1 :] - for unprocessed_message in all_unprocessed_message: - self.sqs_repository.put_sqs_message_back_to_queue( - unprocessed_message - ) + except (PdsTooManyRequestsException, PdsErrorException) as error: + self.handle_process_message_pds_error(records, index, error) raise BulkUploadException( "Bulk upload process paused due to PDS rate limit reached" ) + except ( ClientError, InvalidMessageException, LGInvalidFilesException, Exception, ) as error: - self.unhandled_messages.append(message) - logger.info(f"Failed to process current message due to error: {error}") - logger.info("Continue on next message") + self.handle_process_message_general_error(message, error) + + self.log_processing_summary(records) + + def handle_sqs_message(self, message: dict): + logger.info("validate SQS event") + staging_metadata = self.build_staging_metadata_from_message(message) + logger.info("SQS event is valid. Validating NHS number and file names") + accepted_reason, patient_ods_code = self.validate_entry(staging_metadata) + if accepted_reason is None: + return logger.info( - f"Finish Processing successfully {len(records) - len(self.unhandled_messages)} of {len(records)} messages" + "NHS Number and filename validation complete." + "Validated strick mode, and if we can access the patient information ex:patient dead" + " Checking if virus scan has marked files as Clean" + ) + if not self.validate_virus_scan(staging_metadata, patient_ods_code): + return + logger.info("Virus result validation complete. Initialising transaction") + + self.initiate_transactions() + + logger.info("Transferring files and creating metadata") + if not self.transfer_files(staging_metadata, patient_ods_code): + return + logger.info( + "File transfer complete. Removing uploaded files from staging bucket" + ) + + self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() + + logger.info( + f"Completed file ingestion for patient {staging_metadata.nhs_number}", + {"Result": "Successful upload"}, + ) + logger.info("Reporting transaction successful") + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.COMPLETE, + accepted_reason, + patient_ods_code, + ) + + self.add_information_to_stitching_queue( + staging_metadata, patient_ods_code, accepted_reason + ) + + logger.info( + f"Message sent to stitching queue for patient {staging_metadata.nhs_number}" ) - if self.unhandled_messages: - logger.info("Unable to process the following messages:") - for message in self.unhandled_messages: - message_body = json.loads(message.get("body", "{}")) - request_context.patient_nhs_no = message_body.get( - "NHS-NO", "no number found" - ) - logger.info(message_body) def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: logger.info("Validating SQS event") @@ -117,71 +141,12 @@ def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: logger.error(e) raise InvalidMessageException(str(e)) - def validate_filenames(self, staging_metadata: StagingMetadata): - file_names = [ - os.path.basename(metadata.file_path) for metadata in staging_metadata.files - ] - request_context.patient_nhs_no = staging_metadata.nhs_number - validate_nhs_number(staging_metadata.nhs_number) - validate_lg_file_names(file_names, staging_metadata.nhs_number) - - def validate_accessing_patient_data( - self, - file_names: list[str], - pds_patient_details, - patient_ods_code: str, - ) -> str | None: - - if self.pds_fhir_always_true: - return None - accepted_reason = None - - if self.strict_mode: - is_name_validation_based_on_historic_name = ( - validate_filename_with_patient_details_strict( - file_names, pds_patient_details - ) - ) - else: - ( - name_validation_accepted_reason, - is_name_validation_based_on_historic_name, - ) = validate_filename_with_patient_details_lenient( - file_names, pds_patient_details - ) - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, name_validation_accepted_reason - ) - - if is_name_validation_based_on_historic_name: - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, "Patient matched on historical name" - ) - if not allowed_to_ingest_ods_code(patient_ods_code): - raise LGInvalidFilesException("Patient not registered at your practice") - patient_death_notification_status = ( - pds_patient_details.get_death_notification_status() - ) - if patient_death_notification_status: - deceased_accepted_reason = ( - f"Patient is deceased - {patient_death_notification_status.name}" - ) - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, deceased_accepted_reason - ) - if patient_ods_code is PatientOdsInactiveStatus.RESTRICTED: - accepted_reason = self.concatenate_acceptance_reason( - accepted_reason, "PDS record is restricted" - ) - - return accepted_reason - def validate_entry( self, staging_metadata: StagingMetadata ) -> tuple[str | None, str | None]: patient_ods_code = "" try: - self.validate_filenames(staging_metadata) + self.validate_staging_metadata_filenames(staging_metadata) file_names = [ os.path.basename(metadata.file_path) for metadata in staging_metadata.files @@ -195,7 +160,7 @@ def validate_entry( pds_patient_details.get_ods_code_or_inactive_status_for_gp() ) - accepted_reason = self.validate_accessing_patient_data( + accepted_reason = self.validate_patient_data_access_conditions( file_names, pds_patient_details, patient_ods_code, @@ -273,95 +238,6 @@ def validate_virus_scan( ) return False - def initiate_transactions(self): - self.bulk_upload_s3_repository.init_transaction() - self.dynamo_repository.init_transaction() - logger.info("Transaction initialised.") - - def transfer_files(self, staging_metadata, patient_ods_code) -> bool: - try: - self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) - logger.info( - f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", - {"Result": "Successful upload"}, - ) - return True - except ClientError as e: - logger.info( - f"Got unexpected error during file transfer: {str(e)}", - {"Result": "Unsuccessful upload"}, - ) - logger.info("Will try to rollback any change to database and bucket") - self.rollback_transaction() - - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.FAILED, - "Validation passed but error occurred during file transfer", - patient_ods_code, - ) - return False - - def add_information_to_stitching_queue( - self, staging_metadata, patient_ods_code, accepted_reason - ): - pdf_stitching_sqs_message = PdfStitchingSqsMessage( - nhs_number=staging_metadata.nhs_number, - snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value, - ) - self.sqs_repository.send_message_to_pdf_stitching_queue( - queue_url=self.pdf_stitching_queue_url, - message=pdf_stitching_sqs_message, - ) - - def handle_sqs_message(self, message: dict): - logger.info("validate SQS event") - staging_metadata = self.build_staging_metadata_from_message(message) - logger.info("SQS event is valid. Validating NHS number and file names") - accepted_reason, patient_ods_code = self.validate_entry(staging_metadata) - if accepted_reason is None: - return - - logger.info( - "NHS Number and filename validation complete." - "Validated strick mode, and if we can access the patient information ex:patient dead" - " Checking virus scan has marked files as Clean" - ) - if not self.validate_virus_scan(staging_metadata, patient_ods_code): - return - logger.info("Virus result validation complete. Initialising transaction") - - self.initiate_transactions() - - logger.info("Transferring files and creating metadata") - if not self.transfer_files(staging_metadata, patient_ods_code): - return - logger.info( - "File transfer complete. Removing uploaded files from staging bucket" - ) - - self.bulk_upload_s3_repository.remove_ingested_file_from_source_bucket() - - logger.info( - f"Completed file ingestion for patient {staging_metadata.nhs_number}", - {"Result": "Successful upload"}, - ) - logger.info("Reporting transaction successful") - self.dynamo_repository.write_report_upload_to_dynamo( - staging_metadata, - UploadStatus.COMPLETE, - accepted_reason, - patient_ods_code, - ) - - self.add_information_to_stitching_queue( - staging_metadata, patient_ods_code, accepted_reason - ) - - logger.info( - f"Message sent to stitching queue for patient {staging_metadata.nhs_number}" - ) - def resolve_source_file_path(self, staging_metadata: StagingMetadata): sample_file_path = staging_metadata.files[0].file_path @@ -404,6 +280,35 @@ def resolve_source_file_path(self, staging_metadata: StagingMetadata): self.file_path_cache = resolved_file_paths + def initiate_transactions(self): + self.bulk_upload_s3_repository.init_transaction() + self.dynamo_repository.init_transaction() + logger.info("Transaction initialised.") + + def transfer_files(self, staging_metadata, patient_ods_code) -> bool: + try: + self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) + logger.info( + f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", + {"Result": "Successful upload"}, + ) + return True + except ClientError as e: + logger.info( + f"Got unexpected error during file transfer: {str(e)}", + {"Result": "Unsuccessful upload"}, + ) + logger.info("Will try to rollback any change to database and bucket") + self.rollback_transaction() + + self.dynamo_repository.write_report_upload_to_dynamo( + staging_metadata, + UploadStatus.FAILED, + "Validation passed but error occurred during file transfer", + patient_ods_code, + ) + return False + def create_lg_records_and_copy_files( self, staging_metadata: StagingMetadata, current_gp_ods: str ): @@ -430,16 +335,6 @@ def create_lg_records_and_copy_files( document_reference.doc_status = "final" self.dynamo_repository.create_record_in_lg_dynamo_table(document_reference) - def rollback_transaction(self): - try: - self.bulk_upload_s3_repository.rollback_transaction() - self.dynamo_repository.rollback_transaction() - logger.info("Rolled back an incomplete transaction") - except ClientError as e: - logger.error( - f"Failed to rollback the incomplete transaction due to error: {e}" - ) - def convert_to_document_reference( self, file_metadata: MetadataFile, nhs_number: str, current_gp_ods: str ) -> DocumentReference: @@ -466,6 +361,130 @@ def convert_to_document_reference( return document_reference + def rollback_transaction(self): + try: + self.bulk_upload_s3_repository.rollback_transaction() + self.dynamo_repository.rollback_transaction() + logger.info("Rolled back an incomplete transaction") + except ClientError as e: + logger.error( + f"Failed to rollback the incomplete transaction due to error: {e}" + ) + + def add_information_to_stitching_queue( + self, staging_metadata, patient_ods_code, accepted_reason + ): + pdf_stitching_sqs_message = PdfStitchingSqsMessage( + nhs_number=staging_metadata.nhs_number, + snomed_code_doc_type=SnomedCodes.LLOYD_GEORGE.value, + ) + self.sqs_repository.send_message_to_pdf_stitching_queue( + queue_url=self.pdf_stitching_queue_url, + message=pdf_stitching_sqs_message, + ) + + def validate_staging_metadata_filenames(self, staging_metadata: StagingMetadata): + file_names = [ + os.path.basename(metadata.file_path) for metadata in staging_metadata.files + ] + request_context.patient_nhs_no = staging_metadata.nhs_number + validate_nhs_number(staging_metadata.nhs_number) + validate_lg_file_names(file_names, staging_metadata.nhs_number) + + def validate_patient_data_access_conditions( + self, + file_names: list[str], + pds_patient_details, + patient_ods_code: str, + ) -> str | None: + + if self.bypass_pds: + return None + + accepted_reason = self.validate_file_name(file_names, pds_patient_details) + + if not allowed_to_ingest_ods_code(patient_ods_code): + raise LGInvalidFilesException("Patient not registered at your practice") + + accepted_reason = self.deceased_validation(accepted_reason, pds_patient_details) + accepted_reason = self.restricted_validation(accepted_reason, patient_ods_code) + + return accepted_reason + + def validate_file_name(self, file_names, pds_patient_details) -> str | None: + accepted_reason = None + + if self.strict_mode: + matched_on_history = validate_filename_with_patient_details_strict( + file_names, pds_patient_details + ) + else: + name_reason, matched_on_history = ( + validate_filename_with_patient_details_lenient( + file_names, pds_patient_details + ) + ) + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, name_reason + ) + + if matched_on_history: + accepted_reason = self.concatenate_acceptance_reason( + accepted_reason, "Patient matched on historical name" + ) + + return accepted_reason + + def deceased_validation( + self, accepted_reason: str | None, pds_patient_details + ) -> str | None: + status = pds_patient_details.get_death_notification_status() + if status: + reason = f"Patient is deceased - {status.name}" + return self.concatenate_acceptance_reason(accepted_reason, reason) + return accepted_reason + + def restricted_validation( + self, accepted_reason: str | None, patient_ods_code: str + ) -> str | None: + if patient_ods_code is PatientOdsInactiveStatus.RESTRICTED: + return self.concatenate_acceptance_reason( + accepted_reason, "PDS record is restricted" + ) + return accepted_reason + + def handle_process_message_pds_error( + self, records: list, current_index: int, error: Exception + ): + logger.error(error) + logger.info( + "Cannot validate patient due to PDS responded with Too Many Requests" + ) + logger.info("Cannot process for now due to PDS rate limit reached.") + logger.info( + "All remaining messages in this batch will be returned to SQS queue to retry later." + ) + + remaining_messages = records[current_index - 1 :] + for message in remaining_messages: + self.sqs_repository.put_sqs_message_back_to_queue(message) + + def handle_process_message_general_error(self, message, error: Exception): + self.unhandled_messages.append(message) + logger.info(f"Failed to process current message due to error: {error}") + logger.info("Continue on next message") + + def log_processing_summary(self, records: list): + processed_count = len(records) - len(self.unhandled_messages) + logger.info(f"Finished processing {processed_count} of {len(records)} messages") + + if self.unhandled_messages: + logger.info("Unable to process the following messages:") + for message in self.unhandled_messages: + body = json.loads(message.get("body", "{}")) + request_context.patient_nhs_no = body.get("NHS-NO", "no number found") + logger.info(body) + @staticmethod def strip_leading_slash(filepath: str) -> str: # Handle the filepaths irregularity in the given example of metadata.csv, diff --git a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py index 7f3f23b0f..e91a6cd6d 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service_v2.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service_v2.py @@ -50,6 +50,7 @@ InvalidMessageException, InvalidNhsNumberException, PatientRecordAlreadyExistException, + PdsErrorException, PdsTooManyRequestsException, S3FileNotFoundException, VirusScanFailedException, @@ -171,6 +172,167 @@ def build_resolved_file_names_cache( return dict(zip(file_path_in_metadata, file_path_in_s3)) +def test_process_message_queue_happy_path(mocker, repo_under_test): + message1 = {"body": json.dumps({"NHS-NO": "1234567890"})} + message2 = {"body": json.dumps({"NHS-NO": "9876543210"})} + records = [message1, message2] + + mock_handle = mocker.patch.object(repo_under_test, "handle_sqs_message") + mock_log_summary = mocker.patch.object(repo_under_test, "log_processing_summary") + + repo_under_test.process_message_queue(records) + + assert mock_handle.call_count == 2 + mock_handle.assert_any_call(message1) + mock_handle.assert_any_call(message2) + + mock_log_summary.assert_called_once_with(records) + + +@pytest.mark.parametrize( + "exception_instance", + [ + ClientError( + {"Error": {"Code": "500", "Message": "An error occurred"}}, "TestOperation" + ), + InvalidMessageException("Error occurred"), + LGInvalidFilesException("Error occurred"), + Exception("Error occurred"), + ], +) +def test_process_message_queue_general_error( + mocker, repo_under_test, exception_instance +): + message = {"body": json.dumps({"NHS-NO": "1234567890"})} + records = [message] + + mock_handle = mocker.patch.object( + repo_under_test, "handle_sqs_message", side_effect=exception_instance + ) + mock_general_error = mocker.patch.object( + repo_under_test, "handle_process_message_general_error" + ) + mock_log_summary = mocker.patch.object(repo_under_test, "log_processing_summary") + + repo_under_test.process_message_queue(records) + + mock_handle.assert_called_once_with(message) + mock_general_error.assert_called_once_with(message, exception_instance) + mock_log_summary.assert_called_once_with(records) + + +@pytest.mark.parametrize( + "pds_exception_instance", + [ + PdsTooManyRequestsException("Rate limit reached"), + PdsErrorException("PDS error occurred"), + ], +) +def test_process_message_queue_pds_error( + mocker, repo_under_test, pds_exception_instance +): + message1 = {"body": json.dumps({"NHS-NO": "1234567890"})} + message2 = {"body": json.dumps({"NHS-NO": "9876543210"})} + records = [message1, message2] + + mock_handle = mocker.patch.object( + repo_under_test, "handle_sqs_message", side_effect=pds_exception_instance + ) + mock_pds_error = mocker.patch.object( + repo_under_test, "handle_process_message_pds_error" + ) + mock_log_summary = mocker.patch.object(repo_under_test, "log_processing_summary") + + with pytest.raises( + BulkUploadException, + match="Bulk upload process paused due to PDS rate limit reached", + ): + repo_under_test.process_message_queue(records) + + mock_handle.assert_called_once_with(message1) + mock_pds_error.assert_called_once_with(records, 1, pds_exception_instance) + mock_log_summary.assert_not_called() + + +def test_handle_process_message_pds_error_calls_put_sqs_message_back_to_queue_correctly( + mocker, repo_under_test +): + msg1 = {"body": "msg1"} + msg2 = {"body": "msg2"} + msg3 = {"body": "msg3"} + records = [msg1, msg2, msg3] + + current_index = 2 + + mock_put = mocker.patch.object( + repo_under_test.sqs_repository, "put_sqs_message_back_to_queue" + ) + + error = Exception("PDS rate limit error") + + repo_under_test.handle_process_message_pds_error(records, current_index, error) + + assert mock_put.call_count == 2 + mock_put.assert_any_call(msg2) + mock_put.assert_any_call(msg3) + + +def test_handle_process_message_general_error_adds_message_to_unhandled( + mocker, repo_under_test +): + message = {"body": "test message"} + error = Exception("some error") + + assert repo_under_test.unhandled_messages == [] + + repo_under_test.handle_process_message_general_error(message, error) + + assert message in repo_under_test.unhandled_messages + + +def test_log_processing_summary_without_unhandled_messages(mocker, repo_under_test): + message1 = {"body": json.dumps({"NHS-NO": "1234567890"})} + records = [message1] + + repo_under_test.unhandled_messages = [] + + mock_logger_info = mocker.patch.object(bulk_upload_module.logger, "info") + + repo_under_test.log_processing_summary(records) + + mock_logger_info.assert_any_call( + f"Finished processing {len(records)} of {len(records)} messages" + ) + assert not any( + "Unable to process the following messages:" in str(call.args[0]) + for call in mock_logger_info.mock_calls + ) + + +def test_log_processing_summary_with_unhandled_messages(mocker, repo_under_test): + message1 = {"body": json.dumps({"NHS-NO": "1234567890"})} + message2 = {"body": json.dumps({"NHS-NO": "0987654321"})} + records = [message1, message2] + + repo_under_test.unhandled_messages = [message1, message2] + + mock_logger_info = mocker.patch.object(bulk_upload_module.logger, "info") + mock_request_context = mocker.patch.object( + bulk_upload_module, "request_context", create=True + ) + + repo_under_test.log_processing_summary(records) + + mock_logger_info.assert_any_call( + f"Finished processing 0 of {len(records)} messages" + ) + mock_logger_info.assert_any_call("Unable to process the following messages:") + mock_logger_info.assert_any_call(json.loads(message1["body"])) + mock_logger_info.assert_any_call(json.loads(message2["body"])) + + assert mock_request_context.patient_nhs_no == "0987654321" + + def test_lambda_handler_process_each_sqs_message_one_by_one( set_env, mock_handle_sqs_message ): @@ -1241,7 +1403,7 @@ def test_validate_filenames(repo_under_test, mocker): mock_validate_lg = mocker.patch.object(bulk_upload_module, "validate_lg_file_names") - repo_under_test.validate_filenames(staging_metadata) + repo_under_test.validate_staging_metadata_filenames(staging_metadata) mock_validate_nhs.assert_called_once_with(test_nhs_number) mock_validate_lg.assert_called_once_with( @@ -1257,10 +1419,158 @@ def mock_patient(mocker): return patient +def test_validate_patient_data_happy_path( + mocker, repo_under_test, mock_patient, mock_ods_validation_true +): + file_names = ["Patient_John_Doe.pdf"] + ods_code = "ODS123" + + repo_under_test.bypass_pds = False + + mock_validate_file_name = mocker.patch.object( + repo_under_test, "validate_file_name", return_value="reason: filename check" + ) + mock_deceased = mocker.patch.object( + repo_under_test, + "deceased_validation", + return_value="reason: filename check; patient deceased", + ) + mock_restricted = mocker.patch.object( + repo_under_test, + "restricted_validation", + return_value="reason: filename check; patient deceased; restricted", + ) + + result = repo_under_test.validate_patient_data_access_conditions( + file_names, mock_patient, ods_code + ) + + assert result == "reason: filename check; patient deceased; restricted" + mock_validate_file_name.assert_called_once_with(file_names, mock_patient) + mock_deceased.assert_called_once_with("reason: filename check", mock_patient) + mock_restricted.assert_called_once_with( + "reason: filename check; patient deceased", ods_code + ) + + +def test_validate_file_name_strict_mode_with_history_match( + mocker, repo_under_test, mock_patient +): + mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_strict", + return_value=True, + ) + mock_concat = mocker.patch.object( + repo_under_test, "concatenate_acceptance_reason", side_effect=lambda a, b: b + ) + + result = repo_under_test.validate_file_name(["file.pdf"], mock_patient) + + assert result == "Patient matched on historical name" + mock_concat.assert_called_once_with(None, "Patient matched on historical name") + + +def test_validate_file_name_strict_mode_with_no_history_match( + mocker, repo_under_test, mock_patient +): + mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_strict", + return_value=False, + ) + mock_concat = mocker.patch.object(repo_under_test, "concatenate_acceptance_reason") + + result = repo_under_test.validate_file_name(["file.pdf"], mock_patient) + + assert result is None + mock_concat.assert_not_called() + + +def test_validate_file_name_lenient_mode_with_history_match( + mocker, lenient_repo, mock_patient +): + mocker.patch.object( + bulk_upload_module, + "validate_filename_with_patient_details_lenient", + return_value=("some reason", True), + ) + mock_concat = mocker.patch.object( + lenient_repo, + "concatenate_acceptance_reason", + side_effect=lambda a, b: f"{a}, {b}" if a else b, + ) + + result = lenient_repo.validate_file_name(["file.pdf"], mock_patient) + + assert result == "some reason, Patient matched on historical name" + assert mock_concat.call_count == 2 + + +def test_deceased_validation_with_status(mocker, repo_under_test, mock_patient): + mock_status = mocker.Mock() + mock_status.name = "FORMAL" + mock_patient.get_death_notification_status.return_value = mock_status + + mock_concat = mocker.patch.object( + repo_under_test, + "concatenate_acceptance_reason", + return_value="existing reason; Patient is deceased - FORMAL", + ) + + result = repo_under_test.deceased_validation("existing reason", mock_patient) + + mock_concat.assert_called_once_with( + "existing reason", "Patient is deceased - FORMAL" + ) + assert result == "existing reason; Patient is deceased - FORMAL" + + +def test_deceased_validation_without_status(mocker, repo_under_test, mock_patient): + mock_patient.get_death_notification_status.return_value = None + + mock_concat = mocker.patch.object(repo_under_test, "concatenate_acceptance_reason") + + result = repo_under_test.deceased_validation("existing reason", mock_patient) + + assert result == "existing reason" + mock_concat.assert_not_called() + + +def test_restricted_validation_with_restricted_code(mocker, repo_under_test): + accepted_reason = "some reason" + patient_ods_code = PatientOdsInactiveStatus.RESTRICTED + + mock_concat = mocker.patch.object( + repo_under_test, + "concatenate_acceptance_reason", + return_value="some reason; PDS record is restricted", + ) + + result = repo_under_test.restricted_validation(accepted_reason, patient_ods_code) + + mock_concat.assert_called_once_with(accepted_reason, "PDS record is restricted") + assert result == "some reason; PDS record is restricted" + + +def test_restricted_validation_with_non_restricted_code(mocker, repo_under_test): + accepted_reason = "some reason" + patient_ods_code = "ACTIVE" + + mock_concat = mocker.patch.object(repo_under_test, "concatenate_acceptance_reason") + + result = repo_under_test.restricted_validation(accepted_reason, patient_ods_code) + + mock_concat.assert_not_called() + assert result == accepted_reason + + def test_validate_entry_happy_path(mocker, repo_under_test, mock_patient): staging_metadata = TEST_STAGING_METADATA - mock_validate_filenames = mocker.patch.object(repo_under_test, "validate_filenames") + mock_validate_filenames = mocker.patch.object( + repo_under_test, "validate_staging_metadata_filenames" + ) mock_getting_patient_info_from_pds = mocker.patch.object( bulk_upload_module, "getting_patient_info_from_pds" ) @@ -1269,7 +1579,9 @@ def test_validate_entry_happy_path(mocker, repo_under_test, mock_patient): mock_getting_patient_info_from_pds.return_value = mock_patient mock_validate_accessing_patient_data = mocker.patch.object( - repo_under_test, "validate_accessing_patient_data", return_value="some reason" + repo_under_test, + "validate_patient_data_access_conditions", + return_value="some reason", ) accepted_reason, patient_ods_code = repo_under_test.validate_entry(staging_metadata) @@ -1295,7 +1607,7 @@ def test_validate_entry_invalid_file_exception_triggers_write_to_dynamo( mocker.patch.object( repo_under_test, - "validate_filenames", + "validate_staging_metadata_filenames", side_effect=LGInvalidFilesException("invalid file"), ) mock_write_report = mocker.patch.object( @@ -1318,7 +1630,7 @@ def test_validate_entry_patient_record_exists_exception(mocker, repo_under_test) mocker.patch.object( repo_under_test, - "validate_filenames", + "validate_staging_metadata_filenames", side_effect=PatientRecordAlreadyExistException("record exists"), ) mock_write_report = mocker.patch.object( @@ -1341,7 +1653,7 @@ def test_validate_entry_invalid_nhs_number_exception(mocker, repo_under_test): mocker.patch.object( repo_under_test, - "validate_filenames", + "validate_staging_metadata_filenames", side_effect=InvalidNhsNumberException("bad nhs"), ) mock_write_report = mocker.patch.object( @@ -1361,9 +1673,9 @@ def test_validate_entry_invalid_nhs_number_exception(mocker, repo_under_test): def test_validate_accessing_patient_data_returns_none_when_pds_fhir_always_true( repo_under_test, mock_patient ): - repo_under_test.pds_fhir_always_true = True + repo_under_test.bypass_pds = True - result = repo_under_test.validate_accessing_patient_data( + result = repo_under_test.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1379,7 +1691,7 @@ def test_validate_accessing_patient_data_strict_mode_calls_strict_validation( return_value=False, ) - result = repo_under_test.validate_accessing_patient_data( + result = repo_under_test.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1410,8 +1722,7 @@ def mock_validate_lenient(mocker): def test_validate_accessing_patient_data_lenient_mode_calls_lenient_validation( lenient_repo, mock_patient, mock_validate_lenient, mock_ods_validation_true ): - - result = lenient_repo.validate_accessing_patient_data( + result = lenient_repo.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1429,7 +1740,7 @@ def test_validate_accessing_patient_data_adds_historic_name_reason_when_flag_tru return_value=("some reason", True), ) - result = lenient_repo.validate_accessing_patient_data( + result = lenient_repo.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1444,7 +1755,7 @@ def test_validate_accessing_patient_data_raises_exception_when_ods_code_not_allo with pytest.raises( LGInvalidFilesException, match="Patient not registered at your practice" ): - lenient_repo.validate_accessing_patient_data( + lenient_repo.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1452,12 +1763,11 @@ def test_validate_accessing_patient_data_raises_exception_when_ods_code_not_allo def test_validate_accessing_patient_data_adds_deceased_reason( mocker, lenient_repo, mock_patient, mock_validate_lenient, mock_ods_validation_true ): - deceased_status_mock = mocker.Mock() deceased_status_mock.name = "Formal" mock_patient.get_death_notification_status.return_value = deceased_status_mock - result = lenient_repo.validate_accessing_patient_data( + result = lenient_repo.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, "A1234" ) @@ -1473,7 +1783,7 @@ def test_validate_accessing_patient_data_adds_restricted_reason( "allowed_to_ingest_ods_code", return_value=True, ) - result = lenient_repo.validate_accessing_patient_data( + result = lenient_repo.validate_patient_data_access_conditions( ["file.pdf"], mock_patient, PatientOdsInactiveStatus.RESTRICTED ) From 17b87f324285dc481418478cbf6076a55f63823f Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 28 Aug 2025 09:08:44 +0100 Subject: [PATCH 09/12] [PRMT-580]- added a bit of docstring --- lambdas/services/bulk_upload_service_v2.py | 40 ++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index 91ba2d05a..d861c8b6f 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -61,6 +61,24 @@ def __init__(self, strict_mode, bypass_pds=False): self.bypass_pds = bypass_pds def process_message_queue(self, records: list): + """ + Processes a list of SQS messages from the bulk upload queue. + + Each message is processed individually using `handle_sqs_message`. If a PDS-related + exception occurs (e.g., rate limiting or PDS service failure), processing is paused, + remaining messages are returned to the queue, and a `BulkUploadException` is raised. + + For all other exceptions (e.g., client errors, invalid messages, or unexpected errors), + the message is marked as unhandled, and processing continues with the next message. + + After processing all messages, a summary of the processing outcome is logged. + + Args: + records (list): A list of SQS messages to process. + + Raises: + BulkUploadException: Raised if PDS-related rate limiting or service errors are encountered. + """ for index, message in enumerate(records, start=1): logger.info(f"Processing message {index} of {len(records)}") @@ -84,6 +102,28 @@ def process_message_queue(self, records: list): self.log_processing_summary(records) def handle_sqs_message(self, message: dict): + """ + Handles a single SQS message representing a bulk upload event. + + This method performs the following steps: + 1. Parses the message and constructs staging metadata. + 2. Validates the NHS number and file names. + 3. Performs additional validation checks such as patient access conditions + (e.g., deceased, restricted) and virus scan results. + 4. Initiates transactional operations and transfers the validated files. + 5. Removes the ingested files from the staging bucket. + 6. Logs the completion of ingestion and writes the report to DynamoDB. + 7. Sends metadata to the stitching queue for further processing. + + If at any point the validation fails (e.g., NHS number, virus scan), + the process exits early without performing file ingestion or reporting. + + Args: + message (dict): The SQS message payload containing bulk upload information. + + Returns: + None + """ logger.info("validate SQS event") staging_metadata = self.build_staging_metadata_from_message(message) logger.info("SQS event is valid. Validating NHS number and file names") From 58b43712eda0bd9d379d4e98c5f6f8c4c231a45c Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Fri, 29 Aug 2025 08:54:04 +0100 Subject: [PATCH 10/12] [PRMT-580]- fixed comments --- lambdas/services/bulk_upload_service_v2.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index d861c8b6f..fad22aad9 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -133,7 +133,7 @@ def handle_sqs_message(self, message: dict): logger.info( "NHS Number and filename validation complete." - "Validated strick mode, and if we can access the patient information ex:patient dead" + "Validated strict mode, and if we can access the patient information ex:patient dead" " Checking if virus scan has marked files as Clean" ) if not self.validate_virus_scan(staging_metadata, patient_ods_code): @@ -172,7 +172,6 @@ def handle_sqs_message(self, message: dict): ) def build_staging_metadata_from_message(self, message: dict) -> StagingMetadata: - logger.info("Validating SQS event") try: staging_metadata_json = message["body"] return StagingMetadata.model_validate_json(staging_metadata_json) From 8e65da90ed1c5637eba9d9532c4916d25208c63b Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Fri, 29 Aug 2025 14:35:38 +0100 Subject: [PATCH 11/12] Update lambdas/services/bulk_upload_service_v2.py Co-authored-by: Mohammad Iqbal <127403145+MohammadIqbalAD-NHS@users.noreply.github.com> --- lambdas/services/bulk_upload_service_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index fad22aad9..e158084d7 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -133,7 +133,7 @@ def handle_sqs_message(self, message: dict): logger.info( "NHS Number and filename validation complete." - "Validated strict mode, and if we can access the patient information ex:patient dead" + "Validated strict mode, and patient information is accessible (e.g. patient not deceased/restricted) " Checking if virus scan has marked files as Clean" ) if not self.validate_virus_scan(staging_metadata, patient_ods_code): From b781389479b2d2b547421d03db5a8ba3237e99a5 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Tue, 2 Sep 2025 08:40:54 +0100 Subject: [PATCH 12/12] [PRMT-580]- added closing quotes --- lambdas/services/bulk_upload_service_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/services/bulk_upload_service_v2.py b/lambdas/services/bulk_upload_service_v2.py index e158084d7..7d37ee249 100644 --- a/lambdas/services/bulk_upload_service_v2.py +++ b/lambdas/services/bulk_upload_service_v2.py @@ -133,7 +133,7 @@ def handle_sqs_message(self, message: dict): logger.info( "NHS Number and filename validation complete." - "Validated strict mode, and patient information is accessible (e.g. patient not deceased/restricted) + "Validated strict mode, and patient information is accessible (e.g. patient not deceased/restricted)" " Checking if virus scan has marked files as Clean" ) if not self.validate_virus_scan(staging_metadata, patient_ods_code):