Skip to content
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ MANIFEST
.idea
pydgraph.iml

# VS Code
.vscode

# Python Virtual Environments
venv
.venv
Expand Down
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,55 @@ request = txn.create_request(mutations=[mutation], commit_now=True)
txn.do_request(request)
```

### Committing a Transaction

A transaction can be committed using the `Txn#commit()` method. If your transaction
consist solely of `Txn#query` or `Txn#queryWithVars` calls, and no calls to
`Txn#mutate`, then calling `Txn#commit()` is not necessary.

An error is raised if another transaction(s) modify the same data concurrently that was
modified in the current transaction. It is up to the user to retry transactions
when they fail.

```python
txn = client.txn()
try:
# ...
# Perform any number of queries and mutations
# ...
# and finally...
txn.commit()
except pydgraph.AbortedError:
# Retry or handle exception.
finally:
# Clean up. Calling this after txn.commit() is a no-op
# and hence safe.
txn.discard()
```

#### Using Transaction with Context Manager

The Python context manager will automatically perform the "`commit`" action
after all queries and mutations have been done, and perform "`discard`" action
to clean the transaction.
When something goes wrong in the scope of context manager, "`commit`" will not
be called,and the "`discard`" action will be called to drop any potential changes.

```python
with client.begin(read_only=False, best_effort=False) as txn:
# Do some queries or mutations here
```

or you can directly create a transaction from the `Txn` class.

```python
with pydgraph.Txn(client, read_only=False, best_effort=False) as txn:
# Do some queries or mutations here
```

> `client.begin()` can only be used with "`with-as`" blocks, while `pydgraph.Txn` class can be directly called to instantiate a transaction object.


### Running a Query

You can run a query by calling `Txn#query(string)`. You will need to pass in a
Expand Down Expand Up @@ -506,6 +555,28 @@ stub1.close()
stub2.close()
```

#### Use context manager to automatically clean resources

Use function call:

```python
with pydgraph.client_stub(SERVER_ADDR) as stub1:
with pydgraph.client_stub(SERVER_ADDR) as stub2:
client = pydgraph.DgraphClient(stub1, stub2)
```

Use class constructor:

```python
with pydgraph.DgraphClientStub(SERVER_ADDR) as stub1:
with pydgraph.DgraphClientStub(SERVER_ADDR) as stub2:
client = pydgraph.DgraphClient(stub1, stub2)
```

Note: `client` should be used inside the "`with-as`" block. The resources related to
`client` will be automatically released outside the block and `client` is not usable
any more.

### Setting Metadata Headers

Metadata headers such as authentication tokens can be set through the metadata of gRPC methods.
Expand Down
25 changes: 23 additions & 2 deletions pydgraph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

"""Dgraph python client."""

import contextlib
import random
import urllib.parse

Expand Down Expand Up @@ -154,9 +155,9 @@ def handle_alter_future(future):
except Exception as error:
DgraphClient._common_except_alter(error)

def txn(self, read_only=False, best_effort=False):
def txn(self, read_only=False, best_effort=False, **commit_kwargs):
"""Creates a transaction."""
return txn.Txn(self, read_only=read_only, best_effort=best_effort)
return txn.Txn(self, read_only=read_only, best_effort=best_effort, **commit_kwargs)

def any_client(self):
"""Returns a random gRPC client so that requests are distributed evenly among them."""
Expand All @@ -173,6 +174,26 @@ def close(self):
for client in self._clients:
client.close()

@contextlib.contextmanager
def begin(self,
read_only:bool=False, best_effort:bool=False,
timeout = None, metadata = None, credentials = None):
'''Start a managed transaction.

Note
----
Only use this function in ``with-as`` blocks.
'''
tx = self.txn(read_only=read_only, best_effort=best_effort)
try:
yield tx
if read_only == False and tx._finished == False:
tx.commit(timeout=timeout, metadata=metadata, credentials=credentials)
except Exception as e:
raise e
finally:
tx.discard()


def open(connection_string: str) -> DgraphClient:
"""Open a new Dgraph client. Use client.close() to close the client.
Expand Down
33 changes: 33 additions & 0 deletions pydgraph/client_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

"""Stub for RPC request."""

import contextlib
import grpc

from pydgraph.meta import VERSION
Expand All @@ -29,6 +30,14 @@ def __init__(self, addr="localhost:9080", credentials=None, options=None):
self.channel = grpc.secure_channel(addr, credentials, options)

self.stub = api_grpc.DgraphStub(self.channel)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if exc_type is not None:
raise exc_val

def login(self, login_req, timeout=None, metadata=None, credentials=None):
return self.stub.Login(
Expand Down Expand Up @@ -118,3 +127,27 @@ def from_cloud(cloud_endpoint, api_key, options=None):
options=options,
)
return client_stub

@contextlib.contextmanager
def client_stub(addr='localhost:9080', **kwargs):
""" Create a managed DgraphClientStub instance.

Parameters
----------
addr : str, optional
credentials : ChannelCredentials, optional
options: List[Dict]
An optional list of key-value pairs (``channel_arguments``
in gRPC Core runtime) to configure the channel.

Note
----
Only use this function in ``with-as`` blocks.
"""
stub = DgraphClientStub(addr=addr, **kwargs)
try:
yield stub
except Exception as e:
raise e
finally:
stub.close()
26 changes: 22 additions & 4 deletions pydgraph/txn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Txn(object):
after calling commit.
"""

def __init__(self, client, read_only=False, best_effort=False):
def __init__(self, client, read_only=False, best_effort=False,
timeout=None, metadata=None, credentials=None):
if not read_only and best_effort:
raise Exception(
"Best effort transactions are only compatible with "
Expand All @@ -45,6 +46,23 @@ def __init__(self, client, read_only=False, best_effort=False):
self._mutated = False
self._read_only = read_only
self._best_effort = best_effort
self._commit_kwargs = {
"timeout": timeout,
"metadata": metadata,
"credentials": credentials
}

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.discard(**self._commit_kwargs)
raise exc_val
if self._read_only == False and self._finished == False:
self.commit(**self._commit_kwargs)
else:
self.discard(**self._commit_kwargs)

def query(
self,
Expand Down Expand Up @@ -201,7 +219,7 @@ def handle_query_future(future):
try:
response = future.result()
except Exception as error:
txn._common_except_mutate(error)
Txn._common_except_mutate(error)

return response

Expand All @@ -212,11 +230,11 @@ def handle_mutate_future(txn, future, commit_now):
response = future.result()
except Exception as error:
try:
txn.discard(timeout=timeout, metadata=metadata, credentials=credentials)
txn.discard(**txn._commit_kwargs)
except:
# Ignore error - user should see the original error.
pass
txn._common_except_mutate(error)
Txn._common_except_mutate(error)

if commit_now:
txn._finished = True
Expand Down
2 changes: 1 addition & 1 deletion tests/test_acct_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import pydgraph

from . import helper
from tests import helper

CONCURRENCY = 5
FIRSTS = ["Paul", "Eric", "Jack", "John", "Martin"]
Expand Down
1 change: 1 addition & 0 deletions tests/test_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import unittest

from . import helper
import pydgraph


@unittest.skipIf(shutil.which("dgraph") is None, "Dgraph binary not found.")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import pydgraph

from . import helper
from tests import helper


class TestAsync(helper.ClientIntegrationTestCase):
Expand Down
Loading