-
Notifications
You must be signed in to change notification settings - Fork 25
fix: improve harvester reports access and log downloads #773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,18 +7,25 @@ | |
|
|
||
| """Harvester download resource.""" | ||
|
|
||
| import re | ||
| from datetime import datetime | ||
|
|
||
| from flask import Response, g, request, stream_with_context | ||
| from flask import Response, current_app, request | ||
| from flask_resources import Resource, route | ||
| from invenio_audit_logs.proxies import current_audit_logs_service | ||
| from invenio_jobs.models import Run | ||
| from invenio_search import current_search_client | ||
| from invenio_search.utils import prefix_index | ||
|
|
||
| from cds_rdm.administration.permissions import curators_permission | ||
|
|
||
|
|
||
| class HarvesterDownloadResource(Resource): | ||
| """Harvester download resource.""" | ||
|
|
||
| TIMESTAMP_QUERY_PATTERN = re.compile( | ||
| r'@timestamp:\["?([^"\]]+)"?\s+TO\s+"?([^"\]]+|\*)"?\]' | ||
| ) | ||
|
|
||
| def create_url_rules(self): | ||
| """Create the URL rules for the download resource.""" | ||
| routes = self.config.routes | ||
|
|
@@ -27,44 +34,149 @@ def create_url_rules(self): | |
| ] | ||
|
|
||
| def download(self): | ||
| """Download audit logs for harvester reports as plain text file.""" | ||
| """Download a harvester run's logs as a plain-text ``.log`` file. | ||
|
|
||
| Mirrors the admin job-run page: status header, failure banner, | ||
| truncation warning, and task-grouped entries formatted as | ||
| ``[yyyy-MM-dd HH:mm] LEVEL message``. | ||
| """ | ||
| permission = curators_permission | ||
| if not permission.can(): | ||
| return {"message": "Permission denied"}, 403 | ||
|
|
||
| query = request.args.get("q", "") | ||
| action = request.args.get("action", "") | ||
|
|
||
| if not query: | ||
| return {"message": "No query provided"}, 400 | ||
|
|
||
| params = {"q": query, "size": 1000} | ||
| if action: | ||
| params["action"] = action | ||
|
|
||
| result = current_audit_logs_service.search( | ||
| identity=g.identity, | ||
| params=params, | ||
| ) | ||
|
|
||
| def generate_logs(): | ||
| """Generate log lines one by one.""" | ||
| for hit in result.hits: | ||
| timestamp = hit.get("created", "N/A") | ||
| action = hit.get("action", "N/A") | ||
| resource_type = hit.get("resource", {}).get("type", "N/A") | ||
| resource_id = hit.get("resource", {}).get("id", "N/A") | ||
| user_email = hit.get("user", {}).get("email", "N/A") | ||
|
|
||
| # Format: [timestamp] action resource_type/resource_id user | ||
| line = f"[{timestamp}] {action} {resource_type}/{resource_id} {user_email}\n" | ||
| yield line | ||
| timestamp_match = self.TIMESTAMP_QUERY_PATTERN.search(query) | ||
| if not timestamp_match: | ||
| return {"message": "Invalid harvester run query"}, 400 | ||
|
|
||
| start_time, end_time = timestamp_match.groups() | ||
| try: | ||
| started_at = datetime.fromisoformat(start_time) | ||
| except ValueError: | ||
| return {"message": "Invalid harvester run query"}, 400 | ||
|
|
||
| run_query = Run.query.filter_by(started_at=started_at, parent_run_id=None) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand the intention correctly - are you selecting the run based only on the started at time? if yes - I would make it more precise - it needs to be a run only of the specific harvester type of task (not a job - because a job is an instance of celery task, and if it is not created on the environment, then it will not exist) also, after additional thought... wouldn't it be better to rely on the run identifier rather than on it's timestamp? identifier would be more reliable, no? |
||
| if end_time != "*": | ||
| try: | ||
| finished_at = datetime.fromisoformat(end_time) | ||
| except ValueError: | ||
| return {"message": "Invalid harvester run query"}, 400 | ||
| run_query = run_query.filter_by(finished_at=finished_at) | ||
| else: | ||
| run_query = run_query.filter_by(finished_at=None) | ||
|
|
||
| run = run_query.one_or_none() | ||
| if not run: | ||
| return {"message": "Run not found"}, 404 | ||
|
|
||
| # Query the job-logs index directly: the stock JobLogsPermissionPolicy | ||
| # has ``can_read = [Disable()]``, which blocks every non-superuser | ||
| # identity from reading logs via ``current_jobs_logs_service``. The | ||
| # caller is already gated by ``curators_permission.can()`` above. | ||
| full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use the service layer here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree |
||
| max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000) | ||
| search_query = { | ||
| "query": { | ||
| "bool": { | ||
| "filter": [ | ||
| {"term": {"context.run_id": str(run.id)}}, | ||
| {"term": {"context.job_id": str(run.job_id)}}, | ||
| ] | ||
| } | ||
| }, | ||
| "sort": [ | ||
| {"@timestamp": {"order": "asc"}}, | ||
| {"_id": {"order": "asc"}}, | ||
| ], | ||
| "size": max_results, | ||
| "track_total_hits": True, | ||
| } | ||
|
|
||
| try: | ||
| response = current_search_client.search( | ||
| index=full_index_name, body=search_query | ||
| ) | ||
| except Exception: | ||
| current_app.logger.exception( | ||
| "Failed to fetch structured job logs for harvester run %s", run.id | ||
| ) | ||
| response = {} | ||
|
|
||
| hits = response.get("hits", {}).get("hits", []) | ||
| total = response.get("hits", {}).get("total", {}).get("value", len(hits)) | ||
|
|
||
| def _format_timestamp(raw): | ||
| # Admin UI (RunsLogs.js) format. | ||
| if not raw: | ||
| return "N/A" | ||
| try: | ||
| return datetime.fromisoformat( | ||
| raw.replace("Z", "+00:00") | ||
| ).strftime("%Y-%m-%d %H:%M") | ||
| except (ValueError, TypeError): | ||
| return raw | ||
|
|
||
| # Group by context.task_id in first-seen order (RunsLogs.js buildLogTree). | ||
| task_groups = {} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wasn't this grouping already done somewhere in the code? I am having flashbacks :) |
||
| seen = set() | ||
| error_count = 0 | ||
| for hit in hits: | ||
| src = hit.get("_source", {}) | ||
| raw_ts = src.get("@timestamp") | ||
| level = src.get("level", "INFO") | ||
| # Collapse whitespace so multi-line errors render on one line | ||
| # (admin UI does the same via ``white-space: normal``). | ||
| message = re.sub(r"\s+", " ", (src.get("message") or "")).strip() | ||
| key = (raw_ts, level, message) | ||
| if key in seen: | ||
| continue | ||
| seen.add(key) | ||
| if level == "ERROR": | ||
| error_count += 1 | ||
| task_id = (src.get("context") or {}).get("task_id") or "unknown" | ||
| task_groups.setdefault(task_id, []).append( | ||
| f"[{_format_timestamp(raw_ts)}] {level} {message}" | ||
| ) | ||
|
|
||
| lines = [line for group in task_groups.values() for line in group] | ||
|
|
||
| # Admin UI header: run metadata + FAILED/PARTIAL_SUCCESS banner. | ||
| header = [] | ||
| status = getattr(run.status, "name", str(run.status)) | ||
| header.append(f"Status: {status}") | ||
| header.append(f"Started: {_format_timestamp(run.started_at.isoformat())}") | ||
| if run.finished_at: | ||
| header.append( | ||
| f"Finished: {_format_timestamp(run.finished_at.isoformat())}" | ||
| ) | ||
| if status in ("FAILED", "PARTIAL_SUCCESS"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would include the success ones too, because it might contain warnings (or debug or info, depending on the settings), even if it was successful run |
||
| banner = "Job failed" if status == "FAILED" else "Job partially succeeded" | ||
| header.append("") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does the end result look like? Can you attach an example file?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| header.append(banner) | ||
| if run.message: | ||
| header.append(run.message) | ||
| if error_count: | ||
| header.append(f"{error_count} error(s) found in logs below") | ||
| if total and total > len(lines): | ||
| header.append( | ||
| f"Showing first {len(lines)} of {total} log entries " | ||
| f"(truncated at JOBS_LOGS_MAX_RESULTS={max_results})." | ||
| ) | ||
| header.append("=" * 80) | ||
|
|
||
| logs = "\n".join(header + lines) | ||
|
|
||
| if not lines: | ||
| logs += "\n" + (run.message or "No logs available for this run.\n") | ||
|
|
||
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | ||
| filename = f"harvester_logs_{timestamp}.txt" | ||
| filename = f"harvester_logs_{run.id}_{timestamp}.log" | ||
|
|
||
| return Response( | ||
| stream_with_context(generate_logs()), | ||
| logs, | ||
| mimetype="text/plain", | ||
| headers={"Content-Disposition": f'attachment; filename="{filename}"'}, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,6 +71,9 @@ class CDSRDMRecordPermissionPolicy(RDMRecordPermissionPolicy): | |
| can_create = [AuthenticatedRegularUser(), SystemProcess()] | ||
| can_read = RDMRecordPermissionPolicy.can_read + [ArchiverRead()] | ||
| can_search = RDMRecordPermissionPolicy.can_search + [ArchiverRead()] | ||
| can_search_revisions = RDMRecordPermissionPolicy.can_search_revisions + [ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So curators can use the "View Changes" button on the Harvester Reports page.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you make sure they can only see revisions made by system user? |
||
| HarvesterCurator() | ||
| ] | ||
| can_read_files = RDMRecordPermissionPolicy.can_read_files + [ArchiverRead()] | ||
| can_get_content_files = RDMRecordPermissionPolicy.can_get_content_files + [ | ||
| ArchiverRead() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you check by any chance what does the REST API return for audit logs? we need to make sure REST API does not return wrong entries for curators role (on audit logs endpoint directly, not on this admin one)