diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index e6d8e0485..451f22164 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -151,6 +151,9 @@ jobs: cd dapr_runtime ./dist/linux_amd64/release/placement --healthz-port 9091 & cd .. - - name: Check Examples + - name: Check examples run: | tox -e examples + - name: Run integration tests + run: | + tox -e integration diff --git a/AGENTS.md b/AGENTS.md index db0790606..d1c67c21e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -33,7 +33,9 @@ ext/ # Extension packages (each is a separate PyPI packa └── flask_dapr/ # Flask integration ← see ext/flask_dapr/AGENTS.md tests/ # Unit tests (mirrors dapr/ package structure) -examples/ # Integration test suite ← see examples/AGENTS.md +├── examples/ # Output-based tests that run examples and check stdout +├── integration/ # Programmatic SDK tests using DaprClient directly +examples/ # User-facing example applications ← see examples/AGENTS.md docs/ # Sphinx documentation source tools/ # Build and release scripts ``` @@ -59,17 +61,19 @@ Each extension is a **separate PyPI package** with its own `setup.cfg`, `setup.p | `dapr-ext-langgraph` | `dapr.ext.langgraph` | LangGraph checkpoint persistence to Dapr state store | Moderate | | `dapr-ext-strands` | `dapr.ext.strands` | Strands agent session management via Dapr state store | New | -## Examples (integration test suite) +## Examples and testing -The `examples/` directory serves as both user-facing documentation and the project's integration test suite. Examples are validated in CI using [mechanical-markdown](https://pypi.org/project/mechanical-markdown/), which executes bash code blocks from README files and asserts expected output. +The `examples/` directory contains user-facing example applications. These are validated by two test suites: -**See `examples/AGENTS.md`** for the full guide on example structure, validation, mechanical-markdown STEP blocks, and how to add new examples. +- **`tests/examples/`** — Output-based tests that run examples via `dapr run` and check stdout for expected strings. Uses a `DaprRunner` helper to manage process lifecycle. See `examples/AGENTS.md`. +- **`tests/integration/`** — Programmatic SDK tests that call `DaprClient` methods directly and assert on return values, gRPC status codes, and SDK types. More reliable than output-based tests since they don't depend on print statement formatting. See `tests/integration/AGENTS.md`. Quick reference: ```bash -tox -e examples # Run all examples (needs Dapr runtime) -tox -e example-component -- state_store # Run a single example -cd examples && ./validate.sh state_store # Run directly +tox -e examples # Run output-based example tests +tox -e examples -- test_state_store.py # Run a single example test +tox -e integration # Run programmatic SDK tests +tox -e integration -- test_state_store.py # Run a single integration test ``` ## Python version support @@ -107,8 +111,11 @@ tox -e ruff # Run type checking tox -e type -# Validate examples (requires Dapr runtime) +# Run output-based example tests (requires Dapr runtime) tox -e examples + +# Run programmatic integration tests (requires Dapr runtime) +tox -e integration ``` To run tests directly without tox: @@ -190,9 +197,8 @@ When completing any task on this project, work through this checklist. Not every ### Examples (integration tests) - [ ] If you added a new user-facing feature or building block, add or update an example in `examples/` -- [ ] Ensure the example README has `` blocks with `expected_stdout_lines` so it is validated in CI -- [ ] If you added a new example, register it in `tox.ini` under `[testenv:examples]` -- [ ] If you changed output format of existing functionality, update `expected_stdout_lines` in affected example READMEs +- [ ] Add a corresponding pytest test in `tests/examples/` (output-based) and/or `tests/integration/` (programmatic) +- [ ] If you changed output format of existing functionality, update expected output in `tests/examples/` - [ ] See `examples/AGENTS.md` for full details on writing examples ### Documentation @@ -204,7 +210,7 @@ When completing any task on this project, work through this checklist. Not every - [ ] Run `tox -e ruff` — linting must be clean - [ ] Run `tox -e py311` (or your Python version) — all unit tests must pass -- [ ] If you touched examples: `tox -e example-component -- ` to validate locally +- [ ] If you touched examples: `tox -e examples -- test_.py` to validate locally - [ ] Commits must be signed off for DCO: `git commit -s` ## Important files @@ -219,7 +225,8 @@ When completing any task on this project, work through this checklist. Not every | `dev-requirements.txt` | Development/test dependencies | | `dapr/version/__init__.py` | SDK version string | | `ext/*/setup.cfg` | Extension package metadata and dependencies | -| `examples/validate.sh` | Entry point for mechanical-markdown example validation | +| `tests/examples/` | Output-based tests that validate examples by checking stdout | +| `tests/integration/` | Programmatic SDK tests using DaprClient directly | ## Gotchas @@ -228,6 +235,6 @@ When completing any task on this project, work through this checklist. Not every - **Extension independence**: Each extension is a separate PyPI package. Core SDK changes should not break extensions; extension changes should not require core SDK changes unless intentional. - **DCO signoff**: PRs will be blocked by the DCO bot if commits lack `Signed-off-by`. Always use `git commit -s`. - **Ruff version pinned**: Dev requirements pin `ruff === 0.14.1`. Use this exact version to match CI. -- **Examples are integration tests**: Changing output format (log messages, print statements) can break example validation. Always check `expected_stdout_lines` in example READMEs when modifying user-visible output. +- **Examples are tested by output matching**: Changing output format (log messages, print statements) can break `tests/examples/`. Always check expected output there when modifying user-visible output. - **Background processes in examples**: Examples that start background services (servers, subscribers) must include a cleanup step to stop them, or CI will hang. - **Workflow is the most active area**: See `ext/dapr-ext-workflow/AGENTS.md` for workflow-specific architecture and constraints. diff --git a/CLAUDE.md b/CLAUDE.md index 43c994c2d..84e2b1166 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1 +1,11 @@ @AGENTS.md + +Use pathlib instead of os.path. +Use modern Python (3.10+) features. +Make all code strongly typed. +Keep conditional nesting to a minimum, and use guard clauses when possible. +Aim for medium "visual complexity": use intermediate variables to store results of nested/complex function calls, but don't create a new variable for everything. +Avoid comments unless there is an unusual gotcha, a complex algorithm or anything an experienced code reviewer needs to be aware of. Focus on making better Google-style docstrings instead. + +The user is not always right. Be skeptical and do not blindly comply if something doesn't make sense. +Code should be production-ready. \ No newline at end of file diff --git a/README.md b/README.md index e212cbd43..f6b203412 100644 --- a/README.md +++ b/README.md @@ -121,19 +121,17 @@ tox -e py311 tox -e type ``` -8. Run examples +8. Run integration tests (validates the examples) ```bash tox -e examples ``` -[Dapr Mechanical Markdown](https://github.com/dapr/mechanical-markdown) is used to test the examples. - If you need to run the examples against a pre-released version of the runtime, you can use the following command: - Get your daprd runtime binary from [here](https://github.com/dapr/dapr/releases) for your platform. - Copy the binary to your dapr home folder at $HOME/.dapr/bin/daprd. Or using dapr cli directly: `dapr init --runtime-version ` -- Now you can run the example with `tox -e examples`. +- Now you can run the examples with `tox -e integration`. ## Documentation diff --git a/examples/AGENTS.md b/examples/AGENTS.md index 677470d60..36bd171ef 100644 --- a/examples/AGENTS.md +++ b/examples/AGENTS.md @@ -1,14 +1,13 @@ # AGENTS.md — Dapr Python SDK Examples -The `examples/` directory serves as both **user-facing documentation** and the project's **integration test suite**. Each example is a self-contained application validated automatically in CI using [mechanical-markdown](https://pypi.org/project/mechanical-markdown/), which executes bash code blocks embedded in README files and asserts expected output. +The `examples/` directory serves as the **user-facing documentation**. Each example is a self-contained application validated by pytest-based tests in `tests/examples/`. ## How validation works -1. `examples/validate.sh` is the entry point — it `cd`s into an example directory and runs `mm.py -l README.md` -2. `mm.py` (mechanical-markdown) parses `` HTML comment blocks in the README -3. Each STEP block wraps a fenced bash code block that gets executed -4. stdout/stderr is captured and checked against `expected_stdout_lines` / `expected_stderr_lines` -5. Validation fails if any expected output line is missing +1. Each example has a corresponding test file in `tests/examples/` (e.g., `test_state_store.py`) +2. Tests use a `DaprRunner` helper (defined in `tests/examples/conftest.py`) that wraps `dapr run` commands +3. `DaprRunner.run()` executes a command and captures stdout; `DaprRunner.start()`/`stop()` manage background services +4. Tests assert that expected output lines appear in the captured output Run examples locally (requires a running Dapr runtime via `dapr init`): @@ -17,10 +16,7 @@ Run examples locally (requires a running Dapr runtime via `dapr init`): tox -e examples # Single example -tox -e example-component -- state_store - -# Or directly -cd examples && ./validate.sh state_store +tox -e examples -- test_state_store.py ``` In CI (`validate_examples.yaml`), examples run on all supported Python versions (3.10-3.14) on Ubuntu with a full Dapr runtime including Docker, Redis, and (for LLM examples) Ollama. @@ -31,7 +27,7 @@ Each example follows this pattern: ``` examples// -├── README.md # Documentation + mechanical-markdown STEP blocks (REQUIRED) +├── README.md # Documentation (REQUIRED) ├── *.py # Python application files ├── requirements.txt # Dependencies (optional — many examples rely on the installed SDK) ├── components/ # Dapr component YAML configs (if needed) @@ -46,53 +42,6 @@ Common Python file naming conventions: - Client/caller side: `*-caller.py`, `publisher.py`, `*_client.py` - Standalone: `state_store.py`, `crypto.py`, etc. -## Mechanical-markdown STEP block format - -STEP blocks are HTML comments wrapping fenced bash code in the README: - -````markdown - - -```bash -dapr run --app-id myapp --resources-path ./components/ python3 example.py -``` - - -```` - -### STEP block attributes - -| Attribute | Description | -|-----------|-------------| -| `name` | Descriptive name for the step | -| `expected_stdout_lines` | List of strings that must appear in stdout | -| `expected_stderr_lines` | List of strings that must appear in stderr | -| `background` | `true` to run in background (for long-running services) | -| `sleep` | Seconds to wait after starting before moving to the next step | -| `timeout_seconds` | Max seconds before the step is killed | -| `output_match_mode` | `substring` for partial matching (default is exact) | -| `match_order` | `none` if output lines can appear in any order | - -### Tips for writing STEP blocks - -- Use `background: true` with `sleep:` for services that need to stay running (servers, subscribers) -- Use `timeout_seconds:` to prevent CI hangs on broken examples -- Use `output_match_mode: substring` when output contains timestamps or dynamic content -- Use `match_order: none` when multiple concurrent operations produce unpredictable ordering -- Always include a cleanup step (e.g., `dapr stop --app-id ...`) when using background processes -- Make `expected_stdout_lines` specific enough to validate correctness, but not so brittle they break on cosmetic changes -- Dapr prefixes app output with `== APP ==` — use this in expected lines - ## Dapr component YAML format Components in `components/` directories follow the standard Dapr resource format: @@ -182,69 +131,18 @@ The `workflow` example includes: `simple.py`, `task_chaining.py`, `fan_out_fan_i 1. Create a directory under `examples/` with a descriptive kebab-case name 2. Add Python source files and a `requirements.txt` referencing the needed SDK packages 3. Add Dapr component YAMLs in a `components/` subdirectory if the example uses state, pubsub, etc. -4. Write a `README.md` with: - - Introduction explaining what the example demonstrates - - Pre-requisites section (Dapr CLI, Python 3.10+, any special tools) - - Install instructions (`pip3 install dapr dapr-ext-grpc` etc.) - - Running instructions with `` blocks wrapping `dapr run` commands - - Expected output section - - Cleanup step to stop background processes -5. Register the example in `tox.ini` under `[testenv:examples]` commands: - ``` - ./validate.sh your-example-name - ``` -6. Test locally: `cd examples && ./validate.sh your-example-name` - -## Common README template - -```markdown -# Dapr [Building Block] Example - -This example demonstrates how to use the Dapr [building block] API with the Python SDK. - -## Pre-requisites - -- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started) -- Python 3.10+ - -## Install Dapr python-SDK - -\`\`\`bash -pip3 install dapr dapr-ext-grpc -\`\`\` - -## Run the example - - - -\`\`\`bash -dapr run --app-id myapp --resources-path ./components/ python3 example.py -\`\`\` - - - -## Cleanup - - - -\`\`\`bash -dapr stop --app-id myapp -\`\`\` - - -``` +4. Write a `README.md` with introduction, pre-requisites, install instructions, and running instructions +5. Add a corresponding test in `tests/examples/test_.py`: + - Use the `@pytest.mark.example_dir('')` marker to set the working directory + - Use `dapr.run()` for scripts that exit on their own, `dapr.start()`/`dapr.stop()` for long-running services + - Assert expected output lines appear in the captured output +6. Test locally: `tox -e examples -- test_.py` ## Gotchas -- **Output format changes break CI**: If you modify print statements or log output in SDK code, check whether any example's `expected_stdout_lines` depend on that output. -- **Background processes must be cleaned up**: Missing cleanup steps cause CI to hang. +- **Output format changes break tests**: If you modify print statements or log output in SDK code, check whether any test's expected lines in `tests/examples/` depend on that output. +- **Background processes must be cleaned up**: The `DaprRunner` fixture handles cleanup on teardown, but tests should still call `dapr.stop()` to capture output. - **Dapr prefixes output**: Application stdout appears as `== APP == ` when run via `dapr run`. - **Redis is available in CI**: The CI environment has Redis running on `localhost:6379` — most component YAMLs use this. - **Some examples need special setup**: `crypto` generates keys, `configuration` seeds Redis, `conversation` needs LLM config — check individual READMEs. +- **Infinite-loop example scripts**: Some example scripts (e.g., `invoke-caller.py`) have `while True` loops for demo purposes. Tests must either bypass these with HTTP API calls or use `dapr.run(until=...)` for early termination. \ No newline at end of file diff --git a/examples/validate.sh b/examples/validate.sh deleted file mode 100755 index 202fcaedd..000000000 --- a/examples/validate.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -echo "Home: $HOME" - -cd $1 && mm.py -l README.md diff --git a/pyproject.toml b/pyproject.toml index 2186f5074..f2dc537f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,3 +22,8 @@ ignore = ["E501","E203", "E712", "E722", "E713"] [tool.ruff.format] quote-style = 'single' + +[tool.pytest.ini_options] +markers = [ + 'example_dir(name): set the example directory for the dapr fixture', +] diff --git a/tests/examples/conftest.py b/tests/examples/conftest.py new file mode 100644 index 000000000..4c1c9d22c --- /dev/null +++ b/tests/examples/conftest.py @@ -0,0 +1,147 @@ +import shlex +import subprocess +import tempfile +import threading +import time +from pathlib import Path +from typing import IO, Any, Generator + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +EXAMPLES_DIR = REPO_ROOT / 'examples' + + +def pytest_configure(config: pytest.Config) -> None: + config.addinivalue_line('markers', 'example_dir(name): set the example directory for a test') + + +class DaprRunner: + """Helper to run `dapr run` commands and capture output.""" + + def __init__(self, cwd: Path) -> None: + self._cwd = cwd + self._bg_process: subprocess.Popen[str] | None = None + self._bg_output_file: IO[str] | None = None + + def _spawn(self, args: str) -> subprocess.Popen[str]: + return subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + + @staticmethod + def _terminate(proc: subprocess.Popen[str]) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + def run(self, args: str, *, timeout: int = 30, until: list[str] | None = None) -> str: + """Run a foreground command, block until it finishes, and return output. + + Use this for short-lived processes (e.g. a publisher that exits on its + own). For long-lived background services, use ``start()``/``stop()``. + + Args: + args: Arguments passed to ``dapr run``. + timeout: Maximum seconds to wait before killing the process. + until: If provided, the process is terminated as soon as every + string in this list has appeared in the accumulated output. + """ + proc = self._spawn(args) + lines: list[str] = [] + remaining = set(until) if until else set() + assert proc.stdout is not None + + # Kill the process if it exceeds the timeout. A background timer is + # needed because `for line in proc.stdout` blocks indefinitely when + # the child never exits. + timer = threading.Timer(interval=timeout, function=proc.kill) + timer.start() + + try: + for line in proc.stdout: + print(line, end='', flush=True) + lines.append(line) + if remaining: + output_so_far = ''.join(lines) + remaining = {exp for exp in remaining if exp not in output_so_far} + if not remaining: + break + finally: + timer.cancel() + self._terminate(proc) + + return ''.join(lines) + + def start(self, args: str, *, wait: int = 5) -> subprocess.Popen[str]: + """Start a long-lived background service and return the process handle. + + Use this for servers/subscribers that must stay alive while a second + process runs via ``run()``. Call ``stop()`` to terminate and collect + output. Stdout is written to a temp file to avoid pipe-buffer deadlocks. + """ + output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.log', delete=False) + proc = subprocess.Popen( + args=('dapr', 'run', *shlex.split(args)), + cwd=self._cwd, + stdout=output_file, + stderr=subprocess.STDOUT, + text=True, + ) + self._bg_process = proc + self._bg_output_file = output_file + time.sleep(wait) + return proc + + def stop(self, proc: subprocess.Popen[str]) -> str: + """Stop a background process and return its captured output.""" + self._terminate(proc) + self._bg_process = None + return self._read_and_close_output() + + def cleanup(self) -> None: + """Stop the background process if still running.""" + if self._bg_process is not None: + self._terminate(self._bg_process) + self._bg_process = None + self._read_and_close_output() + + def _read_and_close_output(self) -> str: + if self._bg_output_file is None: + return '' + output_path = Path(self._bg_output_file.name) + self._bg_output_file.close() + self._bg_output_file = None + output = output_path.read_text() + output_path.unlink(missing_ok=True) + print(output, end='', flush=True) + return output + + +@pytest.fixture +def dapr(request: pytest.FixtureRequest) -> Generator[DaprRunner, Any, None]: + """Provides a DaprRunner scoped to an example directory. + + Use the ``example_dir`` marker to select which example: + + @pytest.mark.example_dir('state_store') + def test_something(dapr): + ... + + Defaults to the examples root if no marker is set. + """ + marker = request.node.get_closest_marker('example_dir') + cwd = EXAMPLES_DIR / marker.args[0] if marker else EXAMPLES_DIR + + runner = DaprRunner(cwd) + yield runner + runner.cleanup() diff --git a/tests/examples/test_configuration.py b/tests/examples/test_configuration.py new file mode 100644 index 000000000..8e72d0788 --- /dev/null +++ b/tests/examples/test_configuration.py @@ -0,0 +1,52 @@ +import subprocess +import time + +import pytest + +REDIS_CONTAINER = 'dapr_redis' + +EXPECTED_LINES = [ + 'Got key=orderId1 value=100 version=1 metadata={}', + 'Got key=orderId2 value=200 version=1 metadata={}', + 'Subscribe key=orderId2 value=210 version=2 metadata={}', + 'Unsubscribed successfully? True', +] + + +@pytest.fixture() +def redis_config(): + """Seed configuration values in Redis before the test.""" + subprocess.run( + f'docker exec {REDIS_CONTAINER} redis-cli SET orderId1 "100||1"', + shell=True, + check=True, + capture_output=True, + ) + subprocess.run( + f'docker exec {REDIS_CONTAINER} redis-cli SET orderId2 "200||1"', + shell=True, + check=True, + capture_output=True, + ) + + +@pytest.mark.example_dir('configuration') +def test_configuration(dapr, redis_config): + proc = dapr.start( + '--app-id configexample --resources-path components/ -- python3 configuration.py', + wait=5, + ) + # Update Redis to trigger the subscription notification + subprocess.run( + f'docker exec {REDIS_CONTAINER} redis-cli SET orderId2 "210||2"', + shell=True, + check=True, + capture_output=True, + ) + # configuration.py sleeps 10s after subscribing before it unsubscribes. + # Wait long enough for the full script to finish. + time.sleep(10) + + output = dapr.stop(proc) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_conversation.py b/tests/examples/test_conversation.py new file mode 100644 index 000000000..76ec5ade0 --- /dev/null +++ b/tests/examples/test_conversation.py @@ -0,0 +1,28 @@ +import pytest + +EXPECTED_LINES = [ + "Result: What's Dapr?", + 'Give a brief overview.', +] + + +@pytest.mark.example_dir('conversation') +def test_conversation_alpha1(dapr): + output = dapr.run( + '--app-id conversation-alpha1 --log-level debug --resources-path ./config ' + '-- python3 conversation_alpha1.py', + timeout=60, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('conversation') +def test_conversation_alpha2(dapr): + output = dapr.run( + '--app-id conversation-alpha2 --log-level debug --resources-path ./config ' + '-- python3 conversation_alpha2.py', + timeout=60, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_crypto.py b/tests/examples/test_crypto.py new file mode 100644 index 000000000..1b7a4c527 --- /dev/null +++ b/tests/examples/test_crypto.py @@ -0,0 +1,64 @@ +import shutil +import subprocess +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +CRYPTO_DIR = REPO_ROOT / 'examples' / 'crypto' + +EXPECTED_COMMON = [ + 'Running encrypt/decrypt operation on string', + 'Decrypted the message, got 24 bytes', + 'The secret is "passw0rd"', + 'Running encrypt/decrypt operation on file', + 'Wrote encrypted data to encrypted.out', + 'Wrote decrypted data to decrypted.out.jpg', +] + + +@pytest.fixture() +def crypto_keys(): + keys_dir = CRYPTO_DIR / 'keys' + keys_dir.mkdir(exist_ok=True) + subprocess.run( + 'openssl genpkey -algorithm RSA -pkeyopt rsa_keygen_bits:4096 ' + '-out keys/rsa-private-key.pem', + shell=True, + cwd=CRYPTO_DIR, + check=True, + capture_output=True, + ) + subprocess.run( + 'openssl rand -out keys/symmetric-key-256 32', + shell=True, + cwd=CRYPTO_DIR, + check=True, + capture_output=True, + ) + yield + shutil.rmtree(keys_dir, ignore_errors=True) + (CRYPTO_DIR / 'encrypted.out').unlink(missing_ok=True) + (CRYPTO_DIR / 'decrypted.out.jpg').unlink(missing_ok=True) + + +@pytest.mark.example_dir('crypto') +def test_crypto(dapr, crypto_keys): + output = dapr.run( + '--app-id crypto --resources-path ./components/ -- python3 crypto.py', + timeout=30, + ) + assert 'Running gRPC client synchronous API' in output + for line in EXPECTED_COMMON: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('crypto') +def test_crypto_async(dapr, crypto_keys): + output = dapr.run( + '--app-id crypto-async --resources-path ./components/ -- python3 crypto-async.py', + timeout=30, + ) + assert 'Running gRPC client asynchronous API' in output + for line in EXPECTED_COMMON: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_demo_actor.py b/tests/examples/test_demo_actor.py new file mode 100644 index 000000000..bef8e5476 --- /dev/null +++ b/tests/examples/test_demo_actor.py @@ -0,0 +1,45 @@ +import pytest + +EXPECTED_SERVICE = [ + 'Activate DemoActor actor!', + 'has_value: False', + "set_my_data: {'data': 'new_data'}", + 'has_value: True', + 'set reminder to True', + 'set reminder is done', + 'set_timer to True', + 'set_timer is done', + 'clear_my_data', +] + +EXPECTED_CLIENT = [ + 'call actor method via proxy.invoke_method()', + "b'null'", + 'call actor method using rpc style', + 'None', + 'call SetMyData actor method to save the state', + 'call GetMyData actor method to get the state', + 'Register reminder', + 'Register timer', + 'stop reminder', + 'stop timer', + 'clear actor state', +] + + +@pytest.mark.example_dir('demo_actor/demo_actor') +def test_demo_actor(dapr): + service = dapr.start( + '--app-id demo-actor --app-port 3000 -- uvicorn --port 3000 demo_actor_service:app', + wait=10, + ) + client_output = dapr.run( + '--app-id demo-client python3 demo_actor_client.py', + timeout=60, + ) + for line in EXPECTED_CLIENT: + assert line in client_output, f'Missing in client output: {line}' + + service_output = dapr.stop(service) + for line in EXPECTED_SERVICE: + assert line in service_output, f'Missing in service output: {line}' diff --git a/tests/examples/test_distributed_lock.py b/tests/examples/test_distributed_lock.py new file mode 100644 index 000000000..1d353484d --- /dev/null +++ b/tests/examples/test_distributed_lock.py @@ -0,0 +1,21 @@ +import pytest + +EXPECTED_LINES = [ + 'Will try to acquire a lock from lock store named [lockstore]', + 'The lock is for a resource named [example-lock-resource]', + 'The client identifier is [example-client-id]', + 'The lock will will expire in 60 seconds.', + 'Lock acquired successfully!!!', + 'We already released the lock so unlocking will not work.', + 'We tried to unlock it anyway and got back [UnlockResponseStatus.lock_does_not_exist]', +] + + +@pytest.mark.example_dir('distributed_lock') +def test_distributed_lock(dapr): + output = dapr.run( + '--app-id=locksapp --app-protocol grpc --resources-path components/ python3 lock.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_error_handling.py b/tests/examples/test_error_handling.py new file mode 100644 index 000000000..68c46af0e --- /dev/null +++ b/tests/examples/test_error_handling.py @@ -0,0 +1,22 @@ +import pytest + +EXPECTED_LINES = [ + 'Status code: StatusCode.INVALID_ARGUMENT', + "Message: input key/keyPrefix 'key||' can't contain '||'", + 'Error code: DAPR_STATE_ILLEGAL_KEY', + 'Error info(reason): DAPR_STATE_ILLEGAL_KEY', + 'Resource info (resource type): state', + 'Resource info (resource name): statestore', + 'Bad request (field): key||', + "Bad request (description): input key/keyPrefix 'key||' can't contain '||'", +] + + +@pytest.mark.example_dir('error_handling') +def test_error_handling(dapr): + output = dapr.run( + '--resources-path components -- python3 error_handling.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_grpc_proxying.py b/tests/examples/test_grpc_proxying.py new file mode 100644 index 000000000..a59f03685 --- /dev/null +++ b/tests/examples/test_grpc_proxying.py @@ -0,0 +1,22 @@ +import pytest + +EXPECTED_CALLER = [ + 'Greeter client received: Hello, you!', +] + + +@pytest.mark.example_dir('grpc_proxying') +def test_grpc_proxying(dapr): + receiver = dapr.start( + '--app-id invoke-receiver --app-protocol grpc --app-port 50051 ' + '--config config.yaml -- python invoke-receiver.py', + wait=5, + ) + caller_output = dapr.run( + '--app-id invoke-caller --dapr-grpc-port 50007 --config config.yaml -- python invoke-caller.py', + timeout=30, + ) + for line in EXPECTED_CALLER: + assert line in caller_output, f'Missing in caller output: {line}' + + dapr.stop(receiver) diff --git a/tests/examples/test_invoke_binding.py b/tests/examples/test_invoke_binding.py new file mode 100644 index 000000000..fece88487 --- /dev/null +++ b/tests/examples/test_invoke_binding.py @@ -0,0 +1,63 @@ +import subprocess +import time +from pathlib import Path + +import httpx +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +BINDING_DIR = REPO_ROOT / 'examples' / 'invoke-binding' + +EXPECTED_MESSAGES = [ + '{"id":1,"message":"hello world"}', + '{"id":2,"message":"hello world"}', + '{"id":3,"message":"hello world"}', +] + + +@pytest.fixture() +def kafka(): + subprocess.run( + 'docker compose -f ./docker-compose-single-kafka.yml up -d', + shell=True, + cwd=BINDING_DIR, + check=True, + capture_output=True, + ) + time.sleep(30) + yield + subprocess.run( + 'docker compose -f ./docker-compose-single-kafka.yml down', + shell=True, + cwd=BINDING_DIR, + check=True, + capture_output=True, + ) + + +@pytest.mark.example_dir('invoke-binding') +def test_invoke_binding(dapr, kafka): + receiver = dapr.start( + '--app-id receiver --app-protocol grpc --app-port 50051 ' + '--dapr-http-port 3500 --resources-path ./components python3 invoke-input-binding.py', + wait=5, + ) + + # Publish through the receiver's sidecar (both scripts are infinite, + # so we reimplement the publisher here with a bounded loop). + for n in range(1, 4): + payload = { + 'operation': 'create', + 'data': {'id': n, 'message': 'hello world'}, + } + response = httpx.post( + 'http://localhost:3500/v1.0/bindings/kafkaBinding', json=payload, timeout=5 + ) + response.raise_for_status() + + time.sleep(1) + + time.sleep(5) + receiver_output = dapr.stop(receiver) + for line in EXPECTED_MESSAGES: + assert line in receiver_output, f'Missing in receiver output: {line}' diff --git a/tests/examples/test_invoke_custom_data.py b/tests/examples/test_invoke_custom_data.py new file mode 100644 index 000000000..11acdc106 --- /dev/null +++ b/tests/examples/test_invoke_custom_data.py @@ -0,0 +1,29 @@ +import pytest + +EXPECTED_RECEIVER = [ + 'SOME_DATA', +] + +EXPECTED_CALLER = [ + 'isSuccess: true', + 'code: 200', + 'message: "Hello World - Success!"', +] + + +@pytest.mark.example_dir('invoke-custom-data') +def test_invoke_custom_data(dapr): + receiver = dapr.start( + '--app-id invoke-receiver --app-protocol grpc --app-port 50051 python3 invoke-receiver.py', + wait=5, + ) + caller_output = dapr.run( + '--app-id invoke-caller --app-protocol grpc python3 invoke-caller.py', + timeout=30, + ) + for line in EXPECTED_CALLER: + assert line in caller_output, f'Missing in caller output: {line}' + + receiver_output = dapr.stop(receiver) + for line in EXPECTED_RECEIVER: + assert line in receiver_output, f'Missing in receiver output: {line}' diff --git a/tests/examples/test_invoke_http.py b/tests/examples/test_invoke_http.py new file mode 100644 index 000000000..a7d82ac23 --- /dev/null +++ b/tests/examples/test_invoke_http.py @@ -0,0 +1,34 @@ +import pytest + +EXPECTED_RECEIVER = [ + 'Order received : {"id": 1, "message": "hello world"}', + 'Order error : {"id": 2, "message": "hello world"}', +] + +EXPECTED_CALLER = [ + 'text/html', + '{"success": true}', + '200', + 'error occurred', + 'MY_CODE', + '503', +] + + +@pytest.mark.example_dir('invoke-http') +def test_invoke_http(dapr): + receiver = dapr.start( + '--app-id invoke-receiver --app-port 8088 --app-protocol http ' + '-- python3 invoke-receiver.py', + wait=5, + ) + caller_output = dapr.run( + '--app-id invoke-caller -- python3 invoke-caller.py', + timeout=30, + ) + for line in EXPECTED_CALLER: + assert line in caller_output, f'Missing in caller output: {line}' + + receiver_output = dapr.stop(receiver) + for line in EXPECTED_RECEIVER: + assert line in receiver_output, f'Missing in receiver output: {line}' diff --git a/tests/examples/test_invoke_simple.py b/tests/examples/test_invoke_simple.py new file mode 100644 index 000000000..57e809fdf --- /dev/null +++ b/tests/examples/test_invoke_simple.py @@ -0,0 +1,31 @@ +import httpx +import pytest + +EXPECTED_RECEIVER = [ + '{"id":1,"message":"hello world"}', +] + + +@pytest.mark.example_dir('invoke-simple') +def test_invoke_simple(dapr): + receiver = dapr.start( + '--app-id invoke-receiver --app-protocol grpc --app-port 50051 ' + '--dapr-http-port 3500 python3 invoke-receiver.py', + wait=5, + ) + + # invoke-caller.py runs an infinite loop, so we invoke the method + # directly through the sidecar HTTP API with a single call. + resp = httpx.post( + 'http://localhost:3500/v1.0/invoke/invoke-receiver/method/my-method', + json={'id': 1, 'message': 'hello world'}, + timeout=5, + ) + resp.raise_for_status() + + assert 'text/plain' in resp.headers.get('content-type', '') + assert 'INVOKE_RECEIVED' in resp.text + + receiver_output = dapr.stop(receiver) + for line in EXPECTED_RECEIVER: + assert line in receiver_output, f'Missing in receiver output: {line}' diff --git a/tests/examples/test_jobs.py b/tests/examples/test_jobs.py new file mode 100644 index 000000000..61eb67560 --- /dev/null +++ b/tests/examples/test_jobs.py @@ -0,0 +1,50 @@ +import pytest + +EXPECTED_MANAGEMENT = [ + '0. Scheduling a simple job without data...', + 'Simple job scheduled successfully', + '1. Scheduling a recurring job with cron schedule...', + 'Recurring job scheduled successfully', + '2. Scheduling a one-time job with due_time...', + 'One-time job scheduled successfully', + '3. Scheduling jobs with failure policies...', + 'Job with drop failure policy scheduled successfully', + 'Job with constant retry policy scheduled successfully', + '4. Getting job details...', + 'Retrieved job details:', + '5. Cleaning up - deleting jobs...', + 'Deleted job: simple-job', + 'Deleted job: recurring-hello-job', + 'Deleted job: one-time-hello-job', + 'Deleted job: drop-policy-job', + 'Deleted job: retry-policy-job', +] + +EXPECTED_PROCESSING = [ + 'Dapr Jobs Example', + 'Starting gRPC server on port 50051...', + 'Scheduling jobs...', + 'hello-job scheduled', + 'data-job scheduled', + 'Jobs scheduled! Waiting for execution...', + 'Job event received: hello-job', + 'Data job event received: data-job', +] + + +@pytest.mark.example_dir('jobs') +def test_job_management(dapr): + output = dapr.run('--app-id jobs-example -- python3 job_management.py', timeout=30) + for line in EXPECTED_MANAGEMENT: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('jobs') +def test_job_processing(dapr): + proc = dapr.start( + '--app-id jobs-workflow --app-protocol grpc --app-port 50051 python3 job_processing.py', + wait=15, + ) + output = dapr.stop(proc) + for line in EXPECTED_PROCESSING: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_langgraph_checkpointer.py b/tests/examples/test_langgraph_checkpointer.py new file mode 100644 index 000000000..89e4ab77b --- /dev/null +++ b/tests/examples/test_langgraph_checkpointer.py @@ -0,0 +1,75 @@ +import subprocess +import time + +import httpx +import pytest + +OLLAMA_URL = 'http://localhost:11434' +MODEL = 'llama3.2:3b' + +EXPECTED_LINES = [ + 'Add 3 and 4.', + '7', + '14', +] + + +def _ollama_ready() -> bool: + try: + return httpx.get(f'{OLLAMA_URL}/api/tags', timeout=2).is_success + except httpx.ConnectError: + return False + + +def _model_available() -> bool: + resp = httpx.get(f'{OLLAMA_URL}/api/tags', timeout=5) + return any(m['name'] == MODEL for m in resp.json().get('models', [])) + + +@pytest.fixture() +def ollama(): + """Ensure Ollama is running and the required model is pulled. + + Reuses a running instance if available, otherwise starts one for + the duration of the test. Skips if the ollama CLI is not installed. + """ + started: subprocess.Popen[bytes] | None = None + if not _ollama_ready(): + try: + started = subprocess.Popen( + ['ollama', 'serve'], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except FileNotFoundError: + pytest.skip('ollama is not installed') + time.sleep(10) + + if not _model_available(): + subprocess.run(['ollama', 'pull', MODEL], check=True, capture_output=True) + + yield + + if started: + started.terminate() + started.wait() + + +@pytest.fixture() +def flush_redis(): + """This test is not replayable if the checkpointer state store is not clean.""" + subprocess.run( + ['docker', 'exec', 'dapr_redis', 'redis-cli', 'FLUSHDB'], + capture_output=True, + ) + + +@pytest.mark.example_dir('langgraph-checkpointer') +def test_langgraph_checkpointer(dapr, ollama, flush_redis): + output = dapr.run( + '--app-id langgraph-checkpointer --dapr-grpc-port 5002 ' + '--resources-path ./components -- python3 agent.py', + timeout=120, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_metadata.py b/tests/examples/test_metadata.py new file mode 100644 index 000000000..5beef49da --- /dev/null +++ b/tests/examples/test_metadata.py @@ -0,0 +1,22 @@ +import pytest + +EXPECTED_LINES = [ + 'First, we will assign a new custom label to Dapr sidecar', + "Now, we will fetch the sidecar's metadata", + 'And this is what we got:', + 'application_id: my-metadata-app', + 'active_actors_count: {}', + 'registered_components:', + 'We will update our custom label value and check it was persisted', + 'We added a custom label named [is-this-our-metadata-example]', +] + + +@pytest.mark.example_dir('metadata') +def test_metadata(dapr): + output = dapr.run( + '--app-id=my-metadata-app --app-protocol grpc --resources-path components/ python3 app.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_pubsub_simple.py b/tests/examples/test_pubsub_simple.py new file mode 100644 index 000000000..1b4f84981 --- /dev/null +++ b/tests/examples/test_pubsub_simple.py @@ -0,0 +1,43 @@ +import pytest + +EXPECTED_SUBSCRIBER = [ + 'Subscriber received: id=1, message="hello world", content_type="application/json"', + 'Subscriber received: id=2, message="hello world", content_type="application/json"', + 'Subscriber received: id=3, message="hello world", content_type="application/json"', + 'Wildcard-Subscriber received: id=4, message="hello world", content_type="application/json"', + 'Wildcard-Subscriber received: id=5, message="hello world", content_type="application/json"', + 'Wildcard-Subscriber received: id=6, message="hello world", content_type="application/json"', + 'Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"', + 'Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD', + 'Dead-Letter Subscriber. Originally intended topic: TOPIC_D', + 'Subscriber received: TOPIC_CE', +] + +EXPECTED_PUBLISHER = [ + "{'id': 1, 'message': 'hello world'}", + "{'id': 2, 'message': 'hello world'}", + "{'id': 3, 'message': 'hello world'}", + 'Bulk published 3 events. Failed entries: 0', +] + + +@pytest.mark.example_dir('pubsub-simple') +def test_pubsub_simple(dapr): + subscriber = dapr.start( + '--app-id python-subscriber --app-protocol grpc --app-port 50051 -- python3 subscriber.py', + wait=5, + ) + publisher_output = dapr.run( + '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' + '--enable-app-health-check -- python3 publisher.py', + timeout=30, + ) + for line in EXPECTED_PUBLISHER: + assert line in publisher_output, f'Missing in publisher output: {line}' + + import time + + time.sleep(5) + subscriber_output = dapr.stop(subscriber) + for line in EXPECTED_SUBSCRIBER: + assert line in subscriber_output, f'Missing in subscriber output: {line}' diff --git a/tests/examples/test_pubsub_streaming.py b/tests/examples/test_pubsub_streaming.py new file mode 100644 index 000000000..81b3055bd --- /dev/null +++ b/tests/examples/test_pubsub_streaming.py @@ -0,0 +1,69 @@ +import time + +import pytest + +EXPECTED_SUBSCRIBER = [ + "Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A1...", + "Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A1...", + "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A1...", + "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A1...", + "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A1...", + 'Closing subscription...', +] + +EXPECTED_HANDLER_SUBSCRIBER = [ + "Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A2...", + "Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A2...", + "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A2...", + "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A2...", + "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A2...", + 'Closing subscription...', +] + +EXPECTED_PUBLISHER = [ + "{'id': 1, 'message': 'hello world'}", + "{'id': 2, 'message': 'hello world'}", + "{'id': 3, 'message': 'hello world'}", + "{'id': 4, 'message': 'hello world'}", + "{'id': 5, 'message': 'hello world'}", +] + + +@pytest.mark.example_dir('pubsub-streaming') +def test_pubsub_streaming(dapr): + subscriber = dapr.start( + '--app-id python-subscriber --app-protocol grpc -- python3 subscriber.py --topic=TOPIC_A1', + wait=5, + ) + publisher_output = dapr.run( + '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' + '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_A1', + timeout=30, + ) + for line in EXPECTED_PUBLISHER: + assert line in publisher_output, f'Missing in publisher output: {line}' + + time.sleep(5) + subscriber_output = dapr.stop(subscriber) + for line in EXPECTED_SUBSCRIBER: + assert line in subscriber_output, f'Missing in subscriber output: {line}' + + +@pytest.mark.example_dir('pubsub-streaming') +def test_pubsub_streaming_handler(dapr): + subscriber = dapr.start( + '--app-id python-subscriber --app-protocol grpc -- python3 subscriber-handler.py --topic=TOPIC_A2', + wait=5, + ) + publisher_output = dapr.run( + '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' + '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_A2', + timeout=30, + ) + for line in EXPECTED_PUBLISHER: + assert line in publisher_output, f'Missing in publisher output: {line}' + + time.sleep(5) + subscriber_output = dapr.stop(subscriber) + for line in EXPECTED_HANDLER_SUBSCRIBER: + assert line in subscriber_output, f'Missing in subscriber output: {line}' diff --git a/tests/examples/test_pubsub_streaming_async.py b/tests/examples/test_pubsub_streaming_async.py new file mode 100644 index 000000000..d98a9a670 --- /dev/null +++ b/tests/examples/test_pubsub_streaming_async.py @@ -0,0 +1,69 @@ +import time + +import pytest + +EXPECTED_SUBSCRIBER = [ + "Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_B1...", + "Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_B1...", + "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_B1...", + "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_B1...", + "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_B1...", + 'Closing subscription...', +] + +EXPECTED_HANDLER_SUBSCRIBER = [ + "Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_B2...", + "Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_B2...", + "Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_B2...", + "Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_B2...", + "Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_B2...", + 'Closing subscription...', +] + +EXPECTED_PUBLISHER = [ + "{'id': 1, 'message': 'hello world'}", + "{'id': 2, 'message': 'hello world'}", + "{'id': 3, 'message': 'hello world'}", + "{'id': 4, 'message': 'hello world'}", + "{'id': 5, 'message': 'hello world'}", +] + + +@pytest.mark.example_dir('pubsub-streaming-async') +def test_pubsub_streaming_async(dapr): + subscriber = dapr.start( + '--app-id python-subscriber --app-protocol grpc -- python3 subscriber.py --topic=TOPIC_B1', + wait=5, + ) + publisher_output = dapr.run( + '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' + '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_B1', + timeout=30, + ) + for line in EXPECTED_PUBLISHER: + assert line in publisher_output, f'Missing in publisher output: {line}' + + time.sleep(5) + subscriber_output = dapr.stop(subscriber) + for line in EXPECTED_SUBSCRIBER: + assert line in subscriber_output, f'Missing in subscriber output: {line}' + + +@pytest.mark.example_dir('pubsub-streaming-async') +def test_pubsub_streaming_async_handler(dapr): + subscriber = dapr.start( + '--app-id python-subscriber --app-protocol grpc -- python3 subscriber-handler.py --topic=TOPIC_B2', + wait=5, + ) + publisher_output = dapr.run( + '--app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 ' + '--enable-app-health-check -- python3 publisher.py --topic=TOPIC_B2', + timeout=30, + ) + for line in EXPECTED_PUBLISHER: + assert line in publisher_output, f'Missing in publisher output: {line}' + + time.sleep(5) + subscriber_output = dapr.stop(subscriber) + for line in EXPECTED_HANDLER_SUBSCRIBER: + assert line in subscriber_output, f'Missing in subscriber output: {line}' diff --git a/tests/examples/test_secret_store.py b/tests/examples/test_secret_store.py new file mode 100644 index 000000000..f14baf0eb --- /dev/null +++ b/tests/examples/test_secret_store.py @@ -0,0 +1,33 @@ +import pytest + +EXPECTED_LINES = [ + "{'secretKey': 'secretValue'}", + "[('random', {'random': 'randomValue'}), ('secretKey', {'secretKey': 'secretValue'})]", + "{'random': 'randomValue'}", +] + +EXPECTED_LINES_WITH_ACL = [ + "{'secretKey': 'secretValue'}", + "[('secretKey', {'secretKey': 'secretValue'})]", + 'Got expected error for accessing random key', +] + + +@pytest.mark.example_dir('secret_store') +def test_secret_store(dapr): + output = dapr.run( + '--app-id=secretsapp --app-protocol grpc --resources-path components/ -- python3 example.py', + timeout=30, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('secret_store') +def test_secret_store_with_access_control(dapr): + output = dapr.run( + '--app-id=secretsapp --app-protocol grpc --config config.yaml --resources-path components/ -- python3 example.py', + timeout=30, + ) + for line in EXPECTED_LINES_WITH_ACL: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_state_store.py b/tests/examples/test_state_store.py new file mode 100644 index 000000000..05d67d032 --- /dev/null +++ b/tests/examples/test_state_store.py @@ -0,0 +1,25 @@ +import pytest + +EXPECTED_LINES = [ + 'State store has successfully saved value_1 with key_1 as key', + 'Cannot save due to bad etag. ErrorCode=StatusCode.ABORTED', + 'State store has successfully saved value_2 with key_2 as key', + 'State store has successfully saved value_3 with key_3 as key', + 'Cannot save bulk due to bad etags. ErrorCode=StatusCode.ABORTED', + "Got value=b'value_1' eTag=1", + "Got items with etags: [(b'value_1_updated', '2'), (b'value_2', '2')]", + 'Transaction with outbox pattern executed successfully!', + "Got value after outbox pattern: b'val1'", + "Got values after transaction delete: [b'', b'']", + "Got value after delete: b''", +] + + +@pytest.mark.example_dir('state_store') +def test_state_store(dapr): + output = dapr.run( + '--resources-path components/ -- python3 state_store.py', + timeout=30, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_state_store_query.py b/tests/examples/test_state_store_query.py new file mode 100644 index 000000000..31ab3b1c9 --- /dev/null +++ b/tests/examples/test_state_store_query.py @@ -0,0 +1,49 @@ +import subprocess + +import pytest + +EXPECTED_LINES = [ + '1 {"city": "Seattle", "person": {"id": 1036.0, "org": "Dev Ops"}, "state": "WA"}', + '4 {"city": "Spokane", "person": {"id": 1042.0, "org": "Dev Ops"}, "state": "WA"}', + '10 {"city": "New York", "person": {"id": 1054.0, "org": "Dev Ops"}, "state": "NY"}', + 'Token: 3', + '9 {"city": "San Diego", "person": {"id": 1002.0, "org": "Finance"}, "state": "CA"}', + '7 {"city": "San Francisco", "person": {"id": 1015.0, "org": "Dev Ops"}, "state": "CA"}', + '3 {"city": "Sacramento", "person": {"id": 1071.0, "org": "Finance"}, "state": "CA"}', + 'Token: 6', +] + + +@pytest.fixture() +def mongodb(): + # Remove leftover container from a previous crashed run + subprocess.run('docker rm -f pytest-mongodb', shell=True, capture_output=True) + subprocess.run( + 'docker run -d --rm -p 27017:27017 --name pytest-mongodb mongo:5', + shell=True, + check=True, + capture_output=True, + ) + yield + subprocess.run('docker rm -f pytest-mongodb', shell=True, capture_output=True) + + +@pytest.fixture() +def import_data(mongodb, dapr): + """Import the test dataset into the state store via Dapr.""" + dapr.run( + '--app-id demo --dapr-http-port 3500 --resources-path components ' + '-- curl -X POST -H "Content-Type: application/json" ' + '-d @dataset.json http://localhost:3500/v1.0/state/statestore', + timeout=15, + ) + + +@pytest.mark.example_dir('state_store_query') +def test_state_store_query(dapr, import_data): + output = dapr.run( + '--app-id queryexample --resources-path components/ -- python3 state_store_query.py', + timeout=10, + ) + for line in EXPECTED_LINES: + assert line in output, f'Missing in output: {line}' diff --git a/tests/examples/test_w3c_tracing.py b/tests/examples/test_w3c_tracing.py new file mode 100644 index 000000000..4cd53780a --- /dev/null +++ b/tests/examples/test_w3c_tracing.py @@ -0,0 +1,25 @@ +import pytest + +EXPECTED_CALLER = [ + 'application/json', + 'SAY', + 'text/plain', + 'SLEEP', + 'Trace ID matches after forwarding', +] + + +@pytest.mark.example_dir('w3c-tracing') +def test_w3c_tracing(dapr): + receiver = dapr.start( + '--app-id invoke-receiver --app-protocol grpc --app-port 3001 python3 invoke-receiver.py', + wait=5, + ) + caller_output = dapr.run( + '--app-id invoke-caller --app-protocol grpc python3 invoke-caller.py', + timeout=30, + ) + for line in EXPECTED_CALLER: + assert line in caller_output, f'Missing in caller output: {line}' + + dapr.stop(receiver) diff --git a/tests/examples/test_workflow.py b/tests/examples/test_workflow.py new file mode 100644 index 000000000..b05b3a299 --- /dev/null +++ b/tests/examples/test_workflow.py @@ -0,0 +1,46 @@ +import pytest + +EXPECTED_TASK_CHAINING = [ + 'Step 1: Received input: 42.', + 'Step 2: Received input: 43.', + 'Step 3: Received input: 86.', + 'Workflow completed! Status: WorkflowStatus.COMPLETED', +] + +EXPECTED_FAN_OUT_FAN_IN = [ + 'Final result: 110.', +] + +EXPECTED_SIMPLE = [ + 'Hi Counter!', + 'New counter value is: 1!', + 'New counter value is: 11!', + 'Retry count value is: 0!', + 'Retry count value is: 1! This print statement verifies retry', + 'Get response from hello_world_wf after pause call: SUSPENDED', + 'Get response from hello_world_wf after resume call: RUNNING', + 'New counter value is: 111!', + 'New counter value is: 1111!', + 'Workflow completed! Result: Completed', +] + + +@pytest.mark.example_dir('workflow') +def test_task_chaining(dapr): + output = dapr.run('-- python3 task_chaining.py', timeout=30) + for line in EXPECTED_TASK_CHAINING: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('workflow') +def test_fan_out_fan_in(dapr): + output = dapr.run('-- python3 fan_out_fan_in.py', timeout=60) + for line in EXPECTED_FAN_OUT_FAN_IN: + assert line in output, f'Missing in output: {line}' + + +@pytest.mark.example_dir('workflow') +def test_simple_workflow(dapr): + output = dapr.run('-- python3 simple.py', timeout=60) + for line in EXPECTED_SIMPLE: + assert line in output, f'Missing in output: {line}' diff --git a/tests/integration/AGENTS.md b/tests/integration/AGENTS.md new file mode 100644 index 000000000..bb391a418 --- /dev/null +++ b/tests/integration/AGENTS.md @@ -0,0 +1,94 @@ +# AGENTS.md — Programmatic Integration Tests + +This directory contains **programmatic SDK tests** that call `DaprClient` methods directly and assert on return values, gRPC status codes, and SDK types. Unlike the output-based tests in `tests/examples/` (which run example scripts and check stdout), these tests don't depend on print statement formatting. + +## How it works + +1. `DaprTestEnvironment` (defined in `conftest.py`) manages Dapr sidecar processes +2. `start_sidecar()` launches `dapr run` with explicit ports, waits for the health check, and returns a connected `DaprClient` +3. Tests call SDK methods on that client and assert on the response objects +4. Sidecar stdout is written to temp files (not pipes) to avoid buffer deadlocks +5. Cleanup terminates sidecars, closes clients, and removes log files + +Run locally (requires a running Dapr runtime via `dapr init`): + +```bash +# All integration tests +tox -e integration + +# Single test file +tox -e integration -- test_state_store.py + +# Single test +tox -e integration -- test_state_store.py -k test_save_and_get +``` + +## Directory structure + +``` +tests/integration/ +├── conftest.py # DaprTestEnvironment + fixtures (dapr_env, apps_dir, components_dir) +├── test_*.py # Test files (one per building block) +├── apps/ # Helper apps started alongside sidecars +│ ├── invoke_receiver.py # gRPC method handler for invoke tests +│ └── pubsub_subscriber.py # Subscriber that persists messages to state store +├── components/ # Dapr component YAMLs loaded by all sidecars +│ ├── statestore.yaml # state.redis +│ ├── pubsub.yaml # pubsub.redis +│ ├── lockstore.yaml # lock.redis +│ ├── configurationstore.yaml # configuration.redis +│ └── localsecretstore.yaml # secretstores.local.file +└── secrets.json # Secrets file for localsecretstore component +``` + +## Fixtures + +All fixtures are **module-scoped** — one sidecar per test file. + +| Fixture | Type | Description | +|---------|------|-------------| +| `dapr_env` | `DaprTestEnvironment` | Manages sidecar lifecycle; call `start_sidecar()` to get a client | +| `apps_dir` | `Path` | Path to `tests/integration/apps/` | +| `components_dir` | `Path` | Path to `tests/integration/components/` | + +Each test file defines its own module-scoped `client` fixture that calls `dapr_env.start_sidecar(...)`. + +## Building blocks covered + +| Test file | Building block | SDK methods tested | +|-----------|---------------|-------------------| +| `test_state_store.py` | State management | `save_state`, `get_state`, `save_bulk_state`, `get_bulk_state`, `execute_state_transaction`, `delete_state` | +| `test_invoke.py` | Service invocation | `invoke_method` | +| `test_pubsub.py` | Pub/sub | `publish_event`, `get_state` (to verify delivery) | +| `test_secret_store.py` | Secrets | `get_secret`, `get_bulk_secret` | +| `test_metadata.py` | Metadata | `get_metadata`, `set_metadata` | +| `test_distributed_lock.py` | Distributed lock | `try_lock`, `unlock`, context manager | +| `test_configuration.py` | Configuration | `get_configuration`, `subscribe_configuration`, `unsubscribe_configuration` | + +## Port allocation + +All sidecars default to gRPC port 50001 and HTTP port 3500. Since fixtures are module-scoped and tests run sequentially, only one sidecar is active at a time. If parallel execution is needed in the future, sidecars will need dynamic port allocation. + +## Helper apps + +Some building blocks (invoke, pubsub) require an app process running alongside the sidecar: + +- **`invoke_receiver.py`** — A `dapr.ext.grpc.App` that handles `my-method` and returns `INVOKE_RECEIVED`. +- **`pubsub_subscriber.py`** — Subscribes to `TOPIC_A` and persists received messages to the state store. This lets tests verify message delivery by reading state rather than parsing stdout. + +## Adding a new test + +1. Create `test_.py` +2. Add a module-scoped `client` fixture that calls `dapr_env.start_sidecar(app_id='test-')` +3. If the building block needs a new Dapr component, add a YAML to `components/` +4. If the building block needs a running app, add it to `apps/` and pass `app_cmd` / `app_port` to `start_sidecar()` +5. Use unique keys/resource IDs per test to avoid interference (the sidecar is shared within a module) +6. Assert on SDK return types and gRPC status codes, not on string output + +## Gotchas + +- **Requires `dapr init`** — the tests assume a local Dapr runtime with Redis (`dapr_redis` container on `localhost:6379`), which `dapr init` sets up automatically. +- **Configuration tests seed Redis directly** via `docker exec dapr_redis redis-cli`. +- **Lock and configuration APIs are alpha** and emit `UserWarning` on every call. Tests suppress these with `pytestmark = pytest.mark.filterwarnings('ignore::UserWarning')`. +- **`localsecretstore.yaml` uses a relative path** (`secrets.json`) resolved against `cwd=INTEGRATION_DIR`. +- **Dapr may normalize response fields** — e.g., `content_type` may lose charset parameters when proxied through gRPC. Assert on the media type prefix, not the full string. diff --git a/tests/integration/apps/invoke_receiver.py b/tests/integration/apps/invoke_receiver.py new file mode 100644 index 000000000..41592eb0e --- /dev/null +++ b/tests/integration/apps/invoke_receiver.py @@ -0,0 +1,13 @@ +"""gRPC method handler for invoke integration tests.""" + +from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse + +app = App() + + +@app.method(name='my-method') +def my_method(request: InvokeMethodRequest) -> InvokeMethodResponse: + return InvokeMethodResponse(b'INVOKE_RECEIVED', 'text/plain; charset=UTF-8') + + +app.run(50051) diff --git a/tests/integration/apps/pubsub_subscriber.py b/tests/integration/apps/pubsub_subscriber.py new file mode 100644 index 000000000..110fa14c8 --- /dev/null +++ b/tests/integration/apps/pubsub_subscriber.py @@ -0,0 +1,26 @@ +"""Pub/sub subscriber that persists received messages to state store. + +Used by integration tests to verify message delivery without relying on stdout. +""" + +import json + +from cloudevents.sdk.event import v1 +from dapr.ext.grpc import App + +from dapr.clients import DaprClient +from dapr.clients.grpc._response import TopicEventResponse + +app = App() + + +@app.subscribe(pubsub_name='pubsub', topic='TOPIC_A') +def handle_topic_a(event: v1.Event) -> TopicEventResponse: + data = json.loads(event.Data()) + key = f'received-{data["run_id"]}-{data["id"]}' + with DaprClient() as d: + d.save_state('statestore', key, event.Data()) + return TopicEventResponse('success') + + +app.run(50051) diff --git a/tests/integration/components/configurationstore.yaml b/tests/integration/components/configurationstore.yaml new file mode 100644 index 000000000..fcf6569d0 --- /dev/null +++ b/tests/integration/components/configurationstore.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: configurationstore +spec: + type: configuration.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/localsecretstore.yaml b/tests/integration/components/localsecretstore.yaml new file mode 100644 index 000000000..fd574a077 --- /dev/null +++ b/tests/integration/components/localsecretstore.yaml @@ -0,0 +1,13 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: localsecretstore +spec: + type: secretstores.local.file + metadata: + - name: secretsFile + # Relative to the Dapr process CWD (tests/integration/), set by + # DaprTestEnvironment via cwd=INTEGRATION_DIR. + value: secrets.json + - name: nestedSeparator + value: ":" diff --git a/tests/integration/components/lockstore.yaml b/tests/integration/components/lockstore.yaml new file mode 100644 index 000000000..424caceeb --- /dev/null +++ b/tests/integration/components/lockstore.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: lockstore +spec: + type: lock.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/pubsub.yaml b/tests/integration/components/pubsub.yaml new file mode 100644 index 000000000..18764d8ce --- /dev/null +++ b/tests/integration/components/pubsub.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/components/statestore.yaml b/tests/integration/components/statestore.yaml new file mode 100644 index 000000000..a0c53bc40 --- /dev/null +++ b/tests/integration/components/statestore.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 000000000..5552b2038 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,136 @@ +import shlex +import subprocess +import tempfile +import time +from pathlib import Path +from typing import Any, Generator + +import pytest + +from dapr.clients import DaprClient +from dapr.conf import settings + +INTEGRATION_DIR = Path(__file__).resolve().parent +COMPONENTS_DIR = INTEGRATION_DIR / 'components' +APPS_DIR = INTEGRATION_DIR / 'apps' + + +class DaprTestEnvironment: + """Manages Dapr sidecars and returns SDK clients for programmatic testing. + + Unlike tests.examples.DaprRunner (which captures stdout for output-based assertions), this + class returns real DaprClient instances so tests can make assertions against SDK return values. + """ + + def __init__(self, default_components: Path = COMPONENTS_DIR) -> None: + self._default_components = default_components + self._processes: list[subprocess.Popen[str]] = [] + self._log_files: list[Path] = [] + self._clients: list[DaprClient] = [] + + def start_sidecar( + self, + app_id: str, + *, + grpc_port: int = 50001, + http_port: int = 3500, + app_port: int | None = None, + app_cmd: str | None = None, + components: Path | None = None, + wait: int = 5, + ) -> DaprClient: + """Start a Dapr sidecar and return a connected DaprClient. + + Args: + app_id: Dapr application ID. + grpc_port: Sidecar gRPC port. + http_port: Sidecar HTTP port (also used for the SDK health check). + app_port: Port the app listens on (implies ``--app-protocol grpc``). + app_cmd: Shell command to start alongside the sidecar. + components: Path to component YAML directory. Defaults to + ``tests/integration/components/``. + wait: Seconds to sleep after launching (before the SDK health check). + """ + resources = components or self._default_components + + cmd = [ + 'dapr', + 'run', + '--app-id', + app_id, + '--resources-path', + str(resources), + '--dapr-grpc-port', + str(grpc_port), + '--dapr-http-port', + str(http_port), + ] + if app_port is not None: + cmd.extend(['--app-port', str(app_port), '--app-protocol', 'grpc']) + if app_cmd is not None: + cmd.extend(['--', *shlex.split(app_cmd)]) + + with tempfile.NamedTemporaryFile(mode='w', suffix=f'-{app_id}.log', delete=False) as log: + self._log_files.append(Path(log.name)) + proc = subprocess.Popen( + cmd, + cwd=INTEGRATION_DIR, + stdout=log, + stderr=subprocess.STDOUT, + text=True, + ) + self._processes.append(proc) + + # Give the sidecar a moment to bind its ports before the SDK health + # check starts hitting the HTTP endpoint. + time.sleep(wait) + + # Point the SDK health check at the actual sidecar HTTP port. + # DaprHealth.wait_for_sidecar() reads settings.DAPR_HTTP_PORT, which + # is initialized once at import time and won't reflect a non-default + # http_port unless we update it here. + settings.DAPR_HTTP_PORT = http_port + + client = DaprClient(address=f'127.0.0.1:{grpc_port}') + self._clients.append(client) + return client + + def cleanup(self) -> None: + for client in self._clients: + client.close() + self._clients.clear() + for proc in self._processes: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + self._processes.clear() + for log_path in self._log_files: + log_path.unlink(missing_ok=True) + self._log_files.clear() + + +@pytest.fixture(scope='module') +def dapr_env() -> Generator[DaprTestEnvironment, Any, None]: + """Provides a DaprTestEnvironment for programmatic SDK testing. + + Module-scoped so that all tests in a file share a single Dapr sidecar, + avoiding port conflicts from rapid start/stop cycles and cutting total + test time significantly. + """ + env = DaprTestEnvironment() + yield env + env.cleanup() + + +@pytest.fixture(scope='module') +def apps_dir() -> Path: + return APPS_DIR + + +@pytest.fixture(scope='module') +def components_dir() -> Path: + return COMPONENTS_DIR diff --git a/tests/integration/secrets.json b/tests/integration/secrets.json new file mode 100644 index 000000000..e8db35141 --- /dev/null +++ b/tests/integration/secrets.json @@ -0,0 +1,4 @@ +{ + "secretKey": "secretValue", + "random": "randomValue" +} diff --git a/tests/integration/test_configuration.py b/tests/integration/test_configuration.py new file mode 100644 index 000000000..10e7df835 --- /dev/null +++ b/tests/integration/test_configuration.py @@ -0,0 +1,90 @@ +import subprocess +import threading +import time + +import pytest + +from dapr.clients.grpc._response import ConfigurationResponse + +STORE = 'configurationstore' +REDIS_CONTAINER = 'dapr_redis' + + +def _redis_set(key: str, value: str, version: int = 1) -> None: + """Seed a configuration value directly in Redis. + + Dapr's Redis configuration store encodes values as ``value||version``. + """ + subprocess.run( + args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'SET', key, f'{value}||{version}'), + check=True, + capture_output=True, + ) + + +@pytest.fixture(scope='module') +def client(dapr_env): + _redis_set('cfg-key-1', 'val-1') + _redis_set('cfg-key-2', 'val-2') + return dapr_env.start_sidecar(app_id='test-config') + + +class TestGetConfiguration: + def test_get_single_key(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + assert 'cfg-key-1' in resp.items + assert resp.items['cfg-key-1'].value == 'val-1' + + def test_get_multiple_keys(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1', 'cfg-key-2']) + assert resp.items['cfg-key-1'].value == 'val-1' + assert resp.items['cfg-key-2'].value == 'val-2' + + def test_get_missing_key_returns_empty_items(self, client): + resp = client.get_configuration(store_name=STORE, keys=['nonexistent-cfg-key']) + # Dapr omits keys that don't exist from the response. + assert 'nonexistent-cfg-key' not in resp.items + + def test_items_have_version(self, client): + resp = client.get_configuration(store_name=STORE, keys=['cfg-key-1']) + item = resp.items['cfg-key-1'] + assert item.version + + +class TestSubscribeConfiguration: + def test_subscribe_receives_update(self, client): + received: list[ConfigurationResponse] = [] + event = threading.Event() + + def handler(_id: str, resp: ConfigurationResponse) -> None: + received.append(resp) + event.set() + + sub_id = client.subscribe_configuration( + store_name=STORE, keys=['cfg-sub-key'], handler=handler + ) + assert sub_id + + # Give the subscription watcher thread time to establish its gRPC + # stream before pushing the update, otherwise the notification is missed. + time.sleep(1) + _redis_set('cfg-sub-key', 'updated-val', version=2) + event.wait(timeout=10) + + assert len(received) >= 1 + last = received[-1] + assert 'cfg-sub-key' in last.items + assert last.items['cfg-sub-key'].value == 'updated-val' + + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok + + def test_unsubscribe_returns_true(self, client): + sub_id = client.subscribe_configuration( + store_name=STORE, + keys=['cfg-unsub-key'], + handler=lambda _id, _resp: None, + ) + time.sleep(1) + ok = client.unsubscribe_configuration(store_name=STORE, id=sub_id) + assert ok diff --git a/tests/integration/test_distributed_lock.py b/tests/integration/test_distributed_lock.py new file mode 100644 index 000000000..68362c296 --- /dev/null +++ b/tests/integration/test_distributed_lock.py @@ -0,0 +1,66 @@ +import pytest + +from dapr.clients.grpc._response import UnlockResponseStatus + +STORE = 'lockstore' + +# The distributed lock API emits alpha warnings on every call. +pytestmark = pytest.mark.filterwarnings('ignore::UserWarning') + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-lock') + + +class TestTryLock: + def test_acquire_lock(self, client): + lock = client.try_lock(STORE, 'res-acquire', 'owner-a', expiry_in_seconds=10) + assert lock.success + + def test_second_owner_is_rejected(self, client): + first = client.try_lock(STORE, 'res-contention', 'owner-a', expiry_in_seconds=10) + second = client.try_lock(STORE, 'res-contention', 'owner-b', expiry_in_seconds=10) + assert first.success + assert not second.success + + def test_lock_is_truthy_on_success(self, client): + lock = client.try_lock(STORE, 'res-truthy', 'owner-a', expiry_in_seconds=10) + assert bool(lock) is True + + def test_failed_lock_is_falsy(self, client): + client.try_lock(STORE, 'res-falsy', 'owner-a', expiry_in_seconds=10) + contested = client.try_lock(STORE, 'res-falsy', 'owner-b', expiry_in_seconds=10) + assert bool(contested) is False + + +class TestUnlock: + def test_unlock_own_lock(self, client): + client.try_lock(STORE, 'res-unlock', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-unlock', 'owner-a') + assert resp.status == UnlockResponseStatus.success + + def test_unlock_wrong_owner(self, client): + client.try_lock(STORE, 'res-wrong-owner', 'owner-a', expiry_in_seconds=10) + resp = client.unlock(STORE, 'res-wrong-owner', 'owner-b') + assert resp.status == UnlockResponseStatus.lock_belongs_to_others + + def test_unlock_nonexistent(self, client): + resp = client.unlock(STORE, 'res-does-not-exist', 'owner-a') + assert resp.status == UnlockResponseStatus.lock_does_not_exist + + def test_unlock_frees_resource_for_others(self, client): + client.try_lock(STORE, 'res-release', 'owner-a', expiry_in_seconds=10) + client.unlock(STORE, 'res-release', 'owner-a') + second = client.try_lock(STORE, 'res-release', 'owner-b', expiry_in_seconds=10) + assert second.success + + +class TestLockContextManager: + def test_context_manager_auto_unlocks(self, client): + with client.try_lock(STORE, 'res-ctx', 'owner-a', expiry_in_seconds=10) as lock: + assert lock + + # After the context manager exits, another owner should be able to acquire. + second = client.try_lock(STORE, 'res-ctx', 'owner-b', expiry_in_seconds=10) + assert second.success diff --git a/tests/integration/test_invoke.py b/tests/integration/test_invoke.py new file mode 100644 index 000000000..45abdcdcb --- /dev/null +++ b/tests/integration/test_invoke.py @@ -0,0 +1,34 @@ +import pytest + + +@pytest.fixture(scope='module') +def client(dapr_env, apps_dir): + return dapr_env.start_sidecar( + app_id='invoke-receiver', + grpc_port=50001, + app_port=50051, + app_cmd=f'python3 {apps_dir / "invoke_receiver.py"}', + ) + + +def test_invoke_method_returns_expected_response(client): + resp = client.invoke_method( + app_id='invoke-receiver', + method_name='my-method', + data=b'{"id": 1, "message": "hello world"}', + content_type='application/json', + ) + # The app returns 'text/plain; charset=UTF-8', but Dapr may strip + # parameters when proxying through gRPC, so only check the media type. + assert resp.content_type.startswith('text/plain') + assert resp.data == b'INVOKE_RECEIVED' + + +def test_invoke_method_with_text_data(client): + resp = client.invoke_method( + app_id='invoke-receiver', + method_name='my-method', + data=b'plain text', + content_type='text/plain', + ) + assert resp.data == b'INVOKE_RECEIVED' diff --git a/tests/integration/test_metadata.py b/tests/integration/test_metadata.py new file mode 100644 index 000000000..88430ebbb --- /dev/null +++ b/tests/integration/test_metadata.py @@ -0,0 +1,42 @@ +import pytest + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-metadata') + + +class TestGetMetadata: + def test_application_id_matches(self, client): + meta = client.get_metadata() + assert meta.application_id == 'test-metadata' + + def test_registered_components_present(self, client): + meta = client.get_metadata() + component_types = {c.type for c in meta.registered_components} + assert any(t.startswith('state.') for t in component_types) + + def test_registered_components_have_names(self, client): + meta = client.get_metadata() + for comp in meta.registered_components: + assert comp.name + assert comp.type + + +class TestSetMetadata: + def test_set_and_get_roundtrip(self, client): + client.set_metadata('test-key', 'test-value') + meta = client.get_metadata() + assert meta.extended_metadata.get('test-key') == 'test-value' + + def test_overwrite_existing_key(self, client): + client.set_metadata('overwrite-key', 'first') + client.set_metadata('overwrite-key', 'second') + meta = client.get_metadata() + assert meta.extended_metadata['overwrite-key'] == 'second' + + def test_empty_value_is_allowed(self, client): + client.set_metadata('empty-key', '') + meta = client.get_metadata() + assert 'empty-key' in meta.extended_metadata + assert meta.extended_metadata['empty-key'] == '' diff --git a/tests/integration/test_pubsub.py b/tests/integration/test_pubsub.py new file mode 100644 index 000000000..e4037a8c1 --- /dev/null +++ b/tests/integration/test_pubsub.py @@ -0,0 +1,69 @@ +import json +import subprocess +import time +import uuid + +import pytest + +STORE = 'statestore' +PUBSUB = 'pubsub' +TOPIC = 'TOPIC_A' +REDIS_CONTAINER = 'dapr_redis' + + +def _flush_redis() -> None: + """Flush the Dapr Redis instance to prevent state leaking between runs. + + Both the state store and the pubsub component point at the same + ``dapr_redis`` container (see ``tests/integration/components/``), so a + previous run's ``received-*`` keys could otherwise satisfy this test's + assertions even if no new message was delivered. + """ + subprocess.run( + args=('docker', 'exec', REDIS_CONTAINER, 'redis-cli', 'FLUSHDB'), + check=True, + capture_output=True, + ) + + +@pytest.fixture(scope='module') +def client(dapr_env, apps_dir): + _flush_redis() + return dapr_env.start_sidecar( + app_id='test-subscriber', + grpc_port=50001, + app_port=50051, + app_cmd=f'python3 {apps_dir / "pubsub_subscriber.py"}', + wait=10, + ) + + +def test_published_messages_are_received_by_subscriber(client): + run_id = uuid.uuid4().hex + for n in range(1, 4): + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=json.dumps({'run_id': run_id, 'id': n, 'message': 'hello world'}), + data_content_type='application/json', + ) + time.sleep(1) + + time.sleep(3) + + for n in range(1, 4): + state = client.get_state(store_name=STORE, key=f'received-{run_id}-{n}') + assert state.data != b'', f'Subscriber did not receive message {n}' + msg = json.loads(state.data) + assert msg['id'] == n + assert msg['message'] == 'hello world' + + +def test_publish_event_succeeds(client): + """Verify publish_event does not raise on a valid topic.""" + client.publish_event( + pubsub_name=PUBSUB, + topic_name=TOPIC, + data=json.dumps({'run_id': uuid.uuid4().hex, 'id': 99, 'message': 'smoke test'}), + data_content_type='application/json', + ) diff --git a/tests/integration/test_secret_store.py b/tests/integration/test_secret_store.py new file mode 100644 index 000000000..b4e8e8679 --- /dev/null +++ b/tests/integration/test_secret_store.py @@ -0,0 +1,19 @@ +import pytest + +STORE = 'localsecretstore' + + +@pytest.fixture(scope='module') +def client(dapr_env, components_dir): + return dapr_env.start_sidecar(app_id='test-secret', components=components_dir) + + +def test_get_secret(client): + resp = client.get_secret(store_name=STORE, key='secretKey') + assert resp.secret == {'secretKey': 'secretValue'} + + +def test_get_bulk_secret(client): + resp = client.get_bulk_secret(store_name=STORE) + assert 'secretKey' in resp.secrets + assert resp.secrets['secretKey'] == {'secretKey': 'secretValue'} diff --git a/tests/integration/test_state_store.py b/tests/integration/test_state_store.py new file mode 100644 index 000000000..26ef51cad --- /dev/null +++ b/tests/integration/test_state_store.py @@ -0,0 +1,102 @@ +import grpc +import pytest + +from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType +from dapr.clients.grpc._state import StateItem + +STORE = 'statestore' + + +@pytest.fixture(scope='module') +def client(dapr_env): + return dapr_env.start_sidecar(app_id='test-state') + + +class TestSaveAndGetState: + def test_save_and_get(self, client): + client.save_state(store_name=STORE, key='k1', value='v1') + state = client.get_state(store_name=STORE, key='k1') + assert state.data == b'v1' + assert state.etag + + def test_save_with_wrong_etag_fails(self, client): + client.save_state(store_name=STORE, key='etag-test', value='original') + with pytest.raises(grpc.RpcError) as exc_info: + client.save_state(store_name=STORE, key='etag-test', value='bad', etag='9999') + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + def test_get_missing_key_returns_empty(self, client): + state = client.get_state(store_name=STORE, key='nonexistent-key') + assert state.data == b'' + + +class TestBulkState: + def test_save_and_get_bulk(self, client): + client.save_bulk_state( + store_name=STORE, + states=[ + StateItem(key='bulk-1', value='v1'), + StateItem(key='bulk-2', value='v2'), + ], + ) + items = client.get_bulk_state(store_name=STORE, keys=['bulk-1', 'bulk-2']).items + by_key = {i.key: i.data for i in items} + assert by_key['bulk-1'] == b'v1' + assert by_key['bulk-2'] == b'v2' + + def test_save_bulk_with_wrong_etag_fails(self, client): + client.save_state(store_name=STORE, key='bulk-etag-1', value='original') + with pytest.raises(grpc.RpcError) as exc_info: + client.save_bulk_state( + store_name=STORE, + states=[StateItem(key='bulk-etag-1', value='updated', etag='9999')], + ) + assert exc_info.value.code() == grpc.StatusCode.ABORTED + + +class TestStateTransactions: + def test_transaction_upsert(self, client): + client.save_state(store_name=STORE, key='tx-1', value='original') + etag = client.get_state(store_name=STORE, key='tx-1').etag + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.upsert, + key='tx-1', + data='updated', + etag=etag, + ), + TransactionalStateOperation(key='tx-2', data='new'), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-1').data == b'updated' + assert client.get_state(store_name=STORE, key='tx-2').data == b'new' + + def test_transaction_delete(self, client): + client.save_state(store_name=STORE, key='tx-del-1', value='v1') + client.save_state(store_name=STORE, key='tx-del-2', value='v2') + + client.execute_state_transaction( + store_name=STORE, + operations=[ + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-1' + ), + TransactionalStateOperation( + operation_type=TransactionOperationType.delete, key='tx-del-2' + ), + ], + ) + + assert client.get_state(store_name=STORE, key='tx-del-1').data == b'' + assert client.get_state(store_name=STORE, key='tx-del-2').data == b'' + + +class TestDeleteState: + def test_delete_single(self, client): + client.save_state(store_name=STORE, key='del-1', value='v1') + client.delete_state(store_name=STORE, key='del-1') + assert client.get_state(store_name=STORE, key='del-1').data == b'' diff --git a/tox.ini b/tox.ini index 698e383e9..9e291d597 100644 --- a/tox.ini +++ b/tox.ini @@ -39,64 +39,47 @@ commands = ruff format [testenv:examples] +; Stdout-based smoke tests that run examples/ and check expected output. +; Usage: tox -e examples # run all +; tox -e examples -- test_state_store.py # run one passenv = HOME basepython = python3 -changedir = ./examples/ -deps = - mechanical-markdown - +changedir = ./tests/examples/ commands = - ./validate.sh conversation - ./validate.sh crypto - ./validate.sh metadata - ./validate.sh error_handling - ./validate.sh pubsub-simple - ./validate.sh pubsub-streaming - ./validate.sh pubsub-streaming-async - ./validate.sh state_store - ./validate.sh state_store_query - ./validate.sh secret_store - ./validate.sh invoke-simple - ./validate.sh invoke-custom-data - ./validate.sh demo_actor - ./validate.sh invoke-binding - ./validate.sh grpc_proxying - ./validate.sh w3c-tracing - ./validate.sh distributed_lock - ./validate.sh configuration - ./validate.sh workflow - ./validate.sh jobs - ./validate.sh langgraph-checkpointer - ./validate.sh ../ + pytest {posargs} -v --tb=short + allowlist_externals=* commands_pre = pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands - pip install -e {toxinidir}/ \ + pip install -r {toxinidir}/dev-requirements.txt \ + -e {toxinidir}/ \ -e {toxinidir}/ext/dapr-ext-workflow/ \ -e {toxinidir}/ext/dapr-ext-grpc/ \ -e {toxinidir}/ext/dapr-ext-fastapi/ \ -e {toxinidir}/ext/dapr-ext-langgraph/ \ -e {toxinidir}/ext/dapr-ext-strands/ \ - -e {toxinidir}/ext/flask_dapr/ + -e {toxinidir}/ext/flask_dapr/ \ + opentelemetry-exporter-zipkin \ + langchain-ollama \ + uvicorn -[testenv:example-component] -; This environment is used to validate a specific example component. -; Usage: tox -e example-component -- component_name -; Example: tox -e example-component -- conversation +[testenv:integration] +; SDK-based integration tests using DaprClient directly. +; Usage: tox -e integration # run all +; tox -e integration -- test_state_store.py # run one passenv = HOME basepython = python3 -changedir = ./examples/ -deps = - mechanical-markdown +changedir = ./tests/integration/ commands = - ./validate.sh {posargs} + pytest {posargs} -v --tb=short allowlist_externals=* commands_pre = pip uninstall -y dapr dapr-ext-grpc dapr-ext-fastapi dapr-ext-langgraph dapr-ext-strands dapr-ext-flask dapr-ext-langgraph dapr-ext-strands - pip install -e {toxinidir}/ \ + pip install -r {toxinidir}/dev-requirements.txt \ + -e {toxinidir}/ \ -e {toxinidir}/ext/dapr-ext-workflow/ \ -e {toxinidir}/ext/dapr-ext-grpc/ \ -e {toxinidir}/ext/dapr-ext-fastapi/ \