Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,5 @@ conf/*
apps/worker/celerybeat-schedule.db
/apps/api/conf/*
!/apps/api/conf/
!/apps/api/conf/.gitkeep
!/apps/api/conf/.gitkeep
.idea/FastFetchBot.iml
9 changes: 9 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ MongoDB models and connection logic live in `packages/shared/fastfetchbot_shared
- On subsequent cache hits, `media_files_packaging` uses stored file_ids directly via `InputMediaPhoto(file_id)` etc., skipping HTTP download entirely
- The bot has no direct MongoDB access — all database writes go through the async worker via Redis

**Future media asset caching plan** — target architecture for decoupling external media handling from the Telegram bot:
- Current bot-side video probing with `ffprobe` is only a workaround for Telegram iOS aspect-ratio rendering issues. It should not grow into a full media processing pipeline inside `apps/telegram-bot`
- Move external media download, video probing (`width`, `height`, `duration`), and any future remux/transcode work into the worker layer (`apps/worker` for synchronous heavy I/O, coordinated by API/async-worker as needed)
- Introduce a durable `MediaAsset` cache backed by MongoDB metadata plus object storage such as S3. Suggested fields: source URL/hash, media type, MIME type, file size, storage key, width, height, duration, processing status, error details, expiration/lifecycle fields, and `telegram_file_id`
- Let the bot consume prepared media only: prefer `telegram_file_id` first; otherwise use a ready local/object-storage asset with known dimensions; avoid downloading or probing arbitrary remote media in the bot event loop
- Keep Telegram `file_id` as the highest-priority cache because it avoids both external downloads and object-storage reads on repeated sends
- Use object storage primarily for first sends, unstable external sources, sources requiring cookies/referer, and cases where media metadata must be known before Telegram upload
- Add lifecycle cleanup and deduplication by canonical source URL/content hash before enabling long-term object storage caching

**SQLite/PostgreSQL** (user settings — always enabled for the Telegram bot):
| Variable | Default | Description |
|----------|---------|-------------|
Expand Down
40 changes: 31 additions & 9 deletions apps/async-worker/async_worker/services/file_id_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

from async_worker.config import settings
from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.utils.number import positive_int

FILEID_QUEUE_KEY = "fileid:updates"
FILEID_DLQ_KEY = "fileid:updates:dlq"
_MAX_RETRIES = 3
VIDEO_METADATA_FIELDS = ("width", "height", "duration")

_redis: aioredis.Redis | None = None
_consumer_task: asyncio.Task | None = None
Expand Down Expand Up @@ -44,7 +46,9 @@ async def _consume_loop() -> None:
await r.lpush(FILEID_QUEUE_KEY, raw_payload)
logger.info("Requeued in-flight payload before shutdown")
except Exception:
logger.warning(f"Failed to requeue payload on shutdown: {raw_payload}")
logger.warning(
f"Failed to requeue payload on shutdown: {raw_payload}"
)
Comment on lines +49 to +51
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use logger.exception() in this exception path.

This branch is already handling a real exception during shutdown requeue, so logger.warning(...) drops the traceback exactly where you need it for queue-loss debugging.

Proposed fix
                 except Exception:
-                    logger.warning(
+                    logger.exception(
                         f"Failed to requeue payload on shutdown: {raw_payload}"
                     )

As per coding guidelines, **/*.py: Always use logger.exception() for logging errors with tracebacks and logger.error() for message-only logging; never use print() or traceback.print_exc().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/async-worker/async_worker/services/file_id_consumer.py` around lines 49
- 51, In the shutdown requeue exception path in file_id_consumer.py (the block
that currently calls logger.warning with raw_payload), replace the warning call
with logger.exception so the traceback is captured; keep the existing message
context (e.g. "Failed to requeue payload on shutdown") and include raw_payload
in the log call (use logger.exception("Failed to requeue payload on shutdown:
%s", raw_payload)) so errors during the requeue in that function/path are logged
with the full stack trace.

logger.info("file_id consumer cancelled, shutting down")
break
except json.JSONDecodeError as e:
Expand All @@ -70,11 +74,15 @@ async def _consume_loop() -> None:
try:
# Stamp retry count so we can detect repeated failures.
payload["_retry_count"] = retry_count + 1
await r.lpush(FILEID_QUEUE_KEY, json.dumps(payload, ensure_ascii=False))
await r.lpush(
FILEID_QUEUE_KEY, json.dumps(payload, ensure_ascii=False)
)
except Exception:
logger.warning(f"Failed to requeue payload: {raw_payload}")
else:
logger.error(f"Max retries ({_MAX_RETRIES}) exceeded, moving to DLQ: {raw_payload}")
logger.error(
f"Max retries ({_MAX_RETRIES}) exceeded, moving to DLQ: {raw_payload}"
)
try:
await r.lpush(FILEID_DLQ_KEY, raw_payload)
except Exception:
Expand All @@ -93,9 +101,12 @@ async def _process_file_id_update(payload: dict) -> None:
logger.warning(f"Invalid file_id update payload: {payload}")
return

doc = await Metadata.find(
Metadata.url == metadata_url
).sort("-version").limit(1).first_or_none()
doc = (
await Metadata.find(Metadata.url == metadata_url)
.sort("-version")
.limit(1)
.first_or_none()
)

if doc is None or not doc.media_files:
logger.warning(f"No metadata found for file_id update: {metadata_url}")
Expand All @@ -104,9 +115,20 @@ async def _process_file_id_update(payload: dict) -> None:
matched = 0
for update in updates:
for mf in doc.media_files:
if mf.url == update["url"] and mf.telegram_file_id is None:
mf.telegram_file_id = update["telegram_file_id"]
matched += 1
if mf.url == update["url"]:
changed = False
if mf.telegram_file_id is None:
mf.telegram_file_id = update["telegram_file_id"]
changed = True
if update.get("media_type") == "video":
for field in VIDEO_METADATA_FIELDS:
value = positive_int(update.get(field))
if value is not None and getattr(mf, field, None) != value:
setattr(mf, field, value)
changed = True
if changed:
matched += 1
break

if matched > 0:
await doc.save()
Expand Down
4 changes: 3 additions & 1 deletion apps/telegram-bot/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COPY --from=ghcr.io/astral-sh/uv:0.10.4 /uv /usr/local/bin/uv
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
libmagic1 \
ffmpeg \
build-essential

# copy workspace files for dependency resolution
Expand All @@ -42,7 +43,8 @@ FROM python-base AS production
ENV PYTHONPATH=/app/apps/telegram-bot:$PYTHONPATH
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
libmagic1
libmagic1 \
ffmpeg
COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH
COPY packages/ /app/packages/
COPY apps/telegram-bot/ /app/apps/telegram-bot/
Expand Down
32 changes: 26 additions & 6 deletions apps/telegram-bot/core/services/file_id_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from core.config import settings
from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.utils.number import positive_int

FILEID_QUEUE_KEY = "fileid:updates"
VIDEO_METADATA_FIELDS = ("width", "height", "duration")

_redis: aioredis.Redis | None = None

Expand All @@ -34,6 +36,18 @@ def extract_file_id(message: Message, media_type: str) -> Optional[str]:
return None


def _video_metadata_for_update(info: dict, message: Message) -> dict[str, int]:
metadata = {}
video = getattr(message, "video", None)
for field in VIDEO_METADATA_FIELDS:
value = positive_int(info.get(field))
if value is None and video:
value = positive_int(getattr(video, field, None))
if value is not None:
metadata[field] = value
return metadata


async def capture_and_push_file_ids(
uncached_info: list[dict],
sent_messages: tuple[Message, ...],
Expand All @@ -57,21 +71,27 @@ async def capture_and_push_file_ids(
break
file_id = extract_file_id(sent_messages[i], info["media_type"])
if file_id:
file_id_updates.append({
update = {
"url": info["url"],
"media_type": info["media_type"],
"telegram_file_id": file_id,
})
}
if info["media_type"] == "video":
update.update(_video_metadata_for_update(info, sent_messages[i]))
file_id_updates.append(update)

if not file_id_updates:
return

try:
r = await _get_redis()
payload = json.dumps({
"metadata_url": metadata_url,
"file_id_updates": file_id_updates,
}, ensure_ascii=False)
payload = json.dumps(
{
"metadata_url": metadata_url,
"file_id_updates": file_id_updates,
},
ensure_ascii=False,
)
await r.lpush(FILEID_QUEUE_KEY, payload)
logger.info(f"Pushed {len(file_id_updates)} file_id updates for {metadata_url}")
except Exception:
Expand Down
Loading
Loading