Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions site/cds_rdm/administration/harvester_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,8 @@ def init_search_config(self, **kwargs):
headers=self.get_search_request_headers(**kwargs),
pagination_options=(20, 50),
default_size=20,
hidden_params=[["action", "record.publish"]],
)
hidden_params=[
["action", "record.publish"],
["user_id", "system"],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you check by any chance what does the REST API return for audit logs? we need to make sure REST API does not return wrong entries for curators role (on audit logs endpoint directly, not on this admin one)

],
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import { DownloadButton } from "./DownloadButton";
* Custom SearchBar component with run selector
*/
const SearchBarComponent = ({ updateQueryState, currentQueryState }) => {
const hiddenParams = [
["action", "record.publish"],
["user_id", "system"],
];

// Get runs from data attributes
const domContainer = document.getElementById("invenio-search-config");
const runs = JSON.parse(domContainer?.dataset.harvesterRuns || "[]");
Expand Down Expand Up @@ -48,7 +53,7 @@ const SearchBarComponent = ({ updateQueryState, currentQueryState }) => {
updateQueryState({
...currentQueryState,
queryString,
hiddenParams: [["action", "record.publish"]],
hiddenParams,
});
};

Expand All @@ -61,7 +66,7 @@ const SearchBarComponent = ({ updateQueryState, currentQueryState }) => {
updateQueryState({
...currentQueryState,
queryString: inputValue,
hiddenParams: [["action", "record.publish"]],
hiddenParams,
});
};

Expand Down
168 changes: 140 additions & 28 deletions site/cds_rdm/harvester_download/resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@

"""Harvester download resource."""

import re
from datetime import datetime

from flask import Response, g, request, stream_with_context
from flask import Response, current_app, request
from flask_resources import Resource, route
from invenio_audit_logs.proxies import current_audit_logs_service
from invenio_jobs.models import Run
from invenio_search import current_search_client
from invenio_search.utils import prefix_index

from cds_rdm.administration.permissions import curators_permission


class HarvesterDownloadResource(Resource):
"""Harvester download resource."""

TIMESTAMP_QUERY_PATTERN = re.compile(
r'@timestamp:\["?([^"\]]+)"?\s+TO\s+"?([^"\]]+|\*)"?\]'
)

def create_url_rules(self):
"""Create the URL rules for the download resource."""
routes = self.config.routes
Expand All @@ -27,44 +34,149 @@ def create_url_rules(self):
]

def download(self):
"""Download audit logs for harvester reports as plain text file."""
"""Download a harvester run's logs as a plain-text ``.log`` file.

Mirrors the admin job-run page: status header, failure banner,
truncation warning, and task-grouped entries formatted as
``[yyyy-MM-dd HH:mm] LEVEL message``.
"""
permission = curators_permission
if not permission.can():
return {"message": "Permission denied"}, 403

query = request.args.get("q", "")
action = request.args.get("action", "")

if not query:
return {"message": "No query provided"}, 400

params = {"q": query, "size": 1000}
if action:
params["action"] = action

result = current_audit_logs_service.search(
identity=g.identity,
params=params,
)

def generate_logs():
"""Generate log lines one by one."""
for hit in result.hits:
timestamp = hit.get("created", "N/A")
action = hit.get("action", "N/A")
resource_type = hit.get("resource", {}).get("type", "N/A")
resource_id = hit.get("resource", {}).get("id", "N/A")
user_email = hit.get("user", {}).get("email", "N/A")

# Format: [timestamp] action resource_type/resource_id user
line = f"[{timestamp}] {action} {resource_type}/{resource_id} {user_email}\n"
yield line
timestamp_match = self.TIMESTAMP_QUERY_PATTERN.search(query)
if not timestamp_match:
return {"message": "Invalid harvester run query"}, 400

start_time, end_time = timestamp_match.groups()
try:
started_at = datetime.fromisoformat(start_time)
except ValueError:
return {"message": "Invalid harvester run query"}, 400

run_query = Run.query.filter_by(started_at=started_at, parent_run_id=None)
Copy link
Copy Markdown
Contributor

@kpsherva kpsherva Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the intention correctly - are you selecting the run based only on the started at time? if yes - I would make it more precise - it needs to be a run only of the specific harvester type of task (not a job - because a job is an instance of celery task, and if it is not created on the environment, then it will not exist)
otherwise, if there will be another run (of other type) which started the same time - unlikely but not impossible, then you will return logs of that other run

also, after additional thought... wouldn't it be better to rely on the run identifier rather than on it's timestamp? identifier would be more reliable, no?

if end_time != "*":
try:
finished_at = datetime.fromisoformat(end_time)
except ValueError:
return {"message": "Invalid harvester run query"}, 400
run_query = run_query.filter_by(finished_at=finished_at)
else:
run_query = run_query.filter_by(finished_at=None)

run = run_query.one_or_none()
if not run:
return {"message": "Run not found"}, 404

# Query the job-logs index directly: the stock JobLogsPermissionPolicy
# has ``can_read = [Disable()]``, which blocks every non-superuser
# identity from reading logs via ``current_jobs_logs_service``. The
# caller is already gated by ``curators_permission.can()`` above.
full_index_name = prefix_index(current_app.config["JOBS_LOGGING_INDEX"])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the service layer here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

max_results = current_app.config.get("JOBS_LOGS_MAX_RESULTS", 2000)
search_query = {
"query": {
"bool": {
"filter": [
{"term": {"context.run_id": str(run.id)}},
{"term": {"context.job_id": str(run.job_id)}},
]
}
},
"sort": [
{"@timestamp": {"order": "asc"}},
{"_id": {"order": "asc"}},
],
"size": max_results,
"track_total_hits": True,
}

try:
response = current_search_client.search(
index=full_index_name, body=search_query
)
except Exception:
current_app.logger.exception(
"Failed to fetch structured job logs for harvester run %s", run.id
)
response = {}

hits = response.get("hits", {}).get("hits", [])
total = response.get("hits", {}).get("total", {}).get("value", len(hits))

def _format_timestamp(raw):
# Admin UI (RunsLogs.js) format.
if not raw:
return "N/A"
try:
return datetime.fromisoformat(
raw.replace("Z", "+00:00")
).strftime("%Y-%m-%d %H:%M")
except (ValueError, TypeError):
return raw

# Group by context.task_id in first-seen order (RunsLogs.js buildLogTree).
task_groups = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this grouping already done somewhere in the code? I am having flashbacks :)

seen = set()
error_count = 0
for hit in hits:
src = hit.get("_source", {})
raw_ts = src.get("@timestamp")
level = src.get("level", "INFO")
# Collapse whitespace so multi-line errors render on one line
# (admin UI does the same via ``white-space: normal``).
message = re.sub(r"\s+", " ", (src.get("message") or "")).strip()
key = (raw_ts, level, message)
if key in seen:
continue
seen.add(key)
if level == "ERROR":
error_count += 1
task_id = (src.get("context") or {}).get("task_id") or "unknown"
task_groups.setdefault(task_id, []).append(
f"[{_format_timestamp(raw_ts)}] {level} {message}"
)

lines = [line for group in task_groups.values() for line in group]

# Admin UI header: run metadata + FAILED/PARTIAL_SUCCESS banner.
header = []
status = getattr(run.status, "name", str(run.status))
header.append(f"Status: {status}")
header.append(f"Started: {_format_timestamp(run.started_at.isoformat())}")
if run.finished_at:
header.append(
f"Finished: {_format_timestamp(run.finished_at.isoformat())}"
)
if status in ("FAILED", "PARTIAL_SUCCESS"):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would include the success ones too, because it might contain warnings (or debug or info, depending on the settings), even if it was successful run

banner = "Job failed" if status == "FAILED" else "Job partially succeeded"
header.append("")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the end result look like? Can you attach an example file?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header.append(banner)
if run.message:
header.append(run.message)
if error_count:
header.append(f"{error_count} error(s) found in logs below")
if total and total > len(lines):
header.append(
f"Showing first {len(lines)} of {total} log entries "
f"(truncated at JOBS_LOGS_MAX_RESULTS={max_results})."
)
header.append("=" * 80)

logs = "\n".join(header + lines)

if not lines:
logs += "\n" + (run.message or "No logs available for this run.\n")

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"harvester_logs_{timestamp}.txt"
filename = f"harvester_logs_{run.id}_{timestamp}.log"

return Response(
stream_with_context(generate_logs()),
logs,
mimetype="text/plain",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
3 changes: 3 additions & 0 deletions site/cds_rdm/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class CDSRDMRecordPermissionPolicy(RDMRecordPermissionPolicy):
can_create = [AuthenticatedRegularUser(), SystemProcess()]
can_read = RDMRecordPermissionPolicy.can_read + [ArchiverRead()]
can_search = RDMRecordPermissionPolicy.can_search + [ArchiverRead()]
can_search_revisions = RDMRecordPermissionPolicy.can_search_revisions + [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So curators can use the "View Changes" button on the Harvester Reports page.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make sure they can only see revisions made by system user?
if they can see more, let's discuss on how to tackle it before jumping to implementations

HarvesterCurator()
]
can_read_files = RDMRecordPermissionPolicy.can_read_files + [ArchiverRead()]
can_get_content_files = RDMRecordPermissionPolicy.can_get_content_files + [
ArchiverRead()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ <h3 style="margin-top: 0; color: #856404; font-size: 16px;">Summary</h3>
</div>
{% endif %}

<!-- Call to action button -->
<div style="text-align: center; margin: 30px 0;">
<a href="{{ run_url }}"
style="display: inline-block; padding: 12px 30px; background-color: {{ status_info.color }}; color: white; text-decoration: none; border-radius: 5px; font-weight: bold; font-size: 16px;">
View Full Details
</a>
</div>

{% if job.task == "process_inspire" %}
<!-- Harvester-specific actions (only for INSPIRE harvester jobs) -->
{% set start_time = run.started_at | string | replace(' ', 'T') %}
Expand Down Expand Up @@ -122,4 +114,4 @@ <h3 style="margin-top: 0; color: #333; font-size: 16px;">Harvester Actions</h3>
{% endif %}
</details>
</div>
{% endblock content %}
{% endblock content %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

{{ status_info.action }}

View full details: {{ run_url }}

{% if run.status.name == "PARTIAL_SUCCESS" and run.errored_entries and run.total_entries %}
Summary:
- Successfully processed: {{ success_count }} items
Expand Down
Loading