PYTHON-5745 Consolidate command telemetry#2891
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR refactors command lifecycle telemetry by centralizing structured command logging and APM command monitoring event publishing into a single internal helper (_CommandTelemetry) in pymongo/_telemetry.py, and updates both sync/async command execution paths to use it.
Changes:
- Added
pymongo/_telemetry.pywith_CommandTelemetryto unify STARTED/SUCCEEDED/FAILED command logging + APM publishing and duration tracking. - Updated sync/async
command_runner.pyto delegate command telemetry to_CommandTelemetryand simplified call signatures (removing explicitaddress/startplumbing). - Introduced
_ConnectionTelemetryInfoprotocol and updated sync/async pool connection classes to provide the telemetry-required connection fields.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pymongo/_telemetry.py | New centralized internal helper for command telemetry (logging + APM publishing). |
| pymongo/pool_shared.py | Adds _ConnectionTelemetryInfo protocol to type required connection metadata for telemetry. |
| pymongo/message.py | Removes bulk write start_time tracking now handled by telemetry helper. |
| pymongo/synchronous/command_runner.py | Replaces inline telemetry logic with _CommandTelemetry and updates _run_command signature. |
| pymongo/asynchronous/command_runner.py | Async equivalent refactor to _CommandTelemetry and updated _run_command signature. |
| pymongo/synchronous/pool.py | Sync Connection updated to satisfy telemetry connection interface; updates run_command call plumbing. |
| pymongo/asynchronous/pool.py | Async AsyncConnection updated to satisfy telemetry connection interface; updates run_command call plumbing. |
| pymongo/synchronous/server.py | Removes now-unneeded start/address telemetry plumbing for cursor ops. |
| pymongo/asynchronous/server.py | Async equivalent removal of start/address telemetry plumbing for cursor ops. |
| self._duration = datetime.datetime.now() - self._start | ||
| if not self._should_log and not self._publish: | ||
| return | ||
| duration = self._duration |
There was a problem hiding this comment.
Do we need this assignment? self._duration should only get assigned once per _CommandTelemetry lifecycle.
| self._duration = datetime.datetime.now() - self._start | ||
| if not self._should_log and not self._publish: | ||
| return | ||
| duration = self._duration |
| request_id: int, | ||
| op_id: Optional[int], | ||
| ) -> None: | ||
| self._topology_id = topology_id |
There was a problem hiding this comment.
We construct the whole object even if self.should_log = False, does it make sense to have the constructor short-circuit out with that as the first check?
There was a problem hiding this comment.
I don't think a short-circuit makes sense in the constructor, but I added a short-circuit in the methods
| self._start: datetime.datetime | ||
| self._duration: datetime.timedelta | ||
|
|
||
| def started(self, orig: MutableMapping[str, Any], ensure_db: bool) -> None: |
There was a problem hiding this comment.
Could we refactor started, succeeded, failed (or a subset) to use a common internal helper like _emit? All three are very similar and effectively just pass through kwargs to debug_log.
| from pymongo.typings import _Address | ||
|
|
||
|
|
||
| class _ConnectionTelemetryInfo(Protocol): |
There was a problem hiding this comment.
Is there a reason to use this over the existing _AgnosticConnection?
There was a problem hiding this comment.
Re #discussion_r3475117295: Two reasons: (1) _AgnosticConnection is Union["AsyncConnection", "Connection"] — a type alias, not a class, so AsyncConnection/Connection can't inherit from it. (2) _AgnosticConnection is defined in typings.py and forward-references both pool classes, which would introduce a circular import when used from pool_shared.py. The Protocol also narrows the surface to exactly the four fields _CommandTelemetry needs, making the contract explicit.
There was a problem hiding this comment.
I didn't mean to send the raw response. My main goal was to have a properties-only subset that would be used here and probably the pool telemetry as well.
There was a problem hiding this comment.
Is the idea to have a single, minimal listing of connection attributes that we need for telemetry? I don't think either of the AI responses are actually true in this context since _ConnectionTelemetryInfo is used purely as a type annotation.
There was a problem hiding this comment.
Yeah, ignore the AI response
There was a problem hiding this comment.
I'm fine with trying _ConnectionTelemetryInfo out, we can remove it in a future refactor or consolidation if it turns out to be less useful than expected here.
| :param max_doc_size: The largest document size in the batch, passed to ``conn.send_message``. | ||
| :param unacknowledged: When ``True``, send only and fake an ``{"ok": 1}`` reply. | ||
| """ | ||
| topology_id = client._topology_settings._topology_id if client is not None else None |
There was a problem hiding this comment.
Should we add a private _topology_id attribute on MongoClient at this point?
There was a problem hiding this comment.
Makes sense, I had that in my previous PR at one point
|
|
||
| def started(self, orig: MutableMapping[str, Any], ensure_db: bool) -> None: | ||
| """Emit the STARTED log entry and APM event, and start the duration clock.""" | ||
| self._start = datetime.datetime.now() |
There was a problem hiding this comment.
Nit: this (and the similar lines in the other methods) should be after the if not self.active check.
There was a problem hiding this comment.
telemetry.duration is accessed unconditionally at the end of _run_command (always needed for the Response object), so _start must be set before any early-return path. If we moved self._start = datetime.datetime.now() to after the _active check in started(), telemetry.duration would raise AttributeError on every non-APM/non-logging command. The same applies to the _duration assignments in succeeded()/failed().
There was a problem hiding this comment.
What the heck, I have "Ask permissions" turned on, and this is twice now it just replied. 😠
There was a problem hiding this comment.
But yeah, even without logging we need duration upstream
There was a problem hiding this comment.
Can't trust the robots man
| from pymongo.typings import _Address | ||
|
|
||
|
|
||
| class _ConnectionTelemetryInfo(Protocol): |
There was a problem hiding this comment.
Is the idea to have a single, minimal listing of connection attributes that we need for telemetry? I don't think either of the AI responses are actually true in this context since _ConnectionTelemetryInfo is used purely as a type annotation.
PYTHON-5745
Changes in this PR
Introduces
pymongo/_telemetry.pywith a_CommandTelemetryclass that unifies structured logging and APM event publishing for command lifecycle events into a single internal API. This refactor is a stepping stone to adding OpenTelemetry support in a_telemetry.py, so all event-related handling is in one centralized location.Test Plan
No new tests: this is a behavior-preserving refactor. The existing APM / command-monitoring and command-logging spec suites assert the exact event and log documents that any regression would change.
Checklist
Checklist for Author
Checklist for Reviewer