Asynchronous Progamming Using the Python SDK with Couchbase Server
The Python SDK supports asynchronous programming by offering integration modules for Twisted, gevent, and Python 3.4 asyncio.
Users of any of these frameworks should understand and know how the synchronous API functions before using any asynchronous framework, since the asynchronous functionality is an extension of the synchronous functionality.
Twisted
Refer to the API reference (http://pythonhosted.org/couchbase/api/txcouchbase.html) for more information about the txcouchbase module.
|
Twisted is a high performance and very mature Python asynchronous framework.
Twisted Document (KV) Operations
To use the Python SDK with twisted, use the txcouchbase
module (included with the SDK).
As opposed to the synchronous SDK methods which wait for completion and return Result objects, the txcouchbase.bucket.Bucket
object returns a Twisted Deferred
.
You may configure Deferred
with callback and errback handlers.
Result objects are propagated to the callback as seen in the example below.
from twisted.internet import reactor
from txcouchbase.bucket import Bucket
bucket = Bucket('couchbase://localhost/default')
def on_ok(result):
print "Operation succeeded"
if hasattr(result, 'value'):
print "Value is", result.value
def on_err(err):
print "Operation failed", err
bucket.upsert('id', {'some':['value']}).addCallback(on_ok).addErrback(on_err)
bucket.get('id').addCallback(on_ok).addErrback(on_err)
# tell reactor to stop after 3 seconds
reactor.callLater(3, reactor.stop)
# start up the Twisted reactor (event loop handler) manually
reactor.run()
Twisted N1QL Queries
You may issue N1QL queries in the Twisted API using the n1qlQueryAll()
or n1qlQueryEx()
methods.
The n1qlQueryAll()
method returns an Deferred
which will invoke the callback with an iterable of rows.
The n1qlQueryEx
accepts a user-defined subclass of couchbase.async.n1ql.AsyncN1QLRequest
, with several callbacks implemented, and allows you to handle the rows as they are received from the cluster, rather than buffering the entire resultset in memory before invoking the handler.
@inlineCallbacks
def do_n1ql_query(cb):
rvs = yield cb.n1qlQueryAll(
N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
for row in rvs:
print row
class RowsHandler(AsyncN1QLRequest):
def __init__(self, *args, **kwargs):
super(RowsHandler, self).__init__(*args, **kwargs)
self.deferred = Deferred()
def on_rows(self, rows):
print "Got {} rows".format(len([x for x in rows]))
def on_done(self):
print "Rows complete!"
self.deferred.callback(self)
def on_error(self, ex):
self.deferred.errback(ex)
@inlineCallbacks
def do_n1ql_query(cb):
yield cb.n1qlQueryEx(RowsHandler, N1QLQuery('SELECT * FROM `travel-sample` LIMIT 10'))
Twisted MapReduce (View) Queries
Like N1QL queries, MapReduce queries are available via the queryAll()
and queryEx()
methods, the former returning all the rows at once, and the latter accepting a subclass of couchbase.async.view.AsyncViewBase
which defines various functions.
@inlineCallbacks
def do_view_query():
rows = yield cb.queryAll(
'beer', 'brewery_beers', limit=20, include_docs=True)
for row in rows:
print row.key, row.doc.value['description'][:10] + '...'
from couchbase.async.view import AsyncViewBase
class RowsHandler(AsyncViewBase):
def __init__(self, *args, **kwargs):
super(RowsHandler, self).__init__(*args, **kwargs)
self.deferred = Deferred()
def on_rows(self, rows):
print '** Got row callback!'
for row in rows:
print row.key, row.doc.value['description'][:10] + '...'
def on_error(self, ex):
self.deferred.errback(ex)
def on_done(self):
self.deferred.callback(self)
@inlineCallbacks
def do_view_query(cb):
handler = cb.queryEx(
RowsHandler, 'beer', 'brewery_beers', limit=20, include_docs=True)
yield handler.deferred
Gevent
gevent is a high performance asynchronous framework. It is very powerful in that it allows for a (mostly) traditional synchronous coding style. It accomplishes this by using coroutines known as greenlets or eventlets.
The Gevent support in the Python SDK is truly native when you use the dedicated module, The above is important because the normal synchronous |
The gevent API is almost identical to the simple synchronous API, though it requires that a different module be imported to properly integrate with gevent. So instead of
from couchbase.bucket import Bucket
cb = Bucket(connstr)
do
from gcouchbase.bucket import Bucket
cb = Bucket(connstr)
Gevent Document (KV) Operations:
Key-Value operations using gcouchbase
is exactly the same as the synchronous couchbase
counterpart
from gcouchbase.bucket import Bucket
bucket = Bucket('couchbase://localhost/default')
bucket.upsert('id', {'some':['value']})
print bucket.get('id').value
Gevent N1QL Queries
The API for N1QL queries with gcouchbase is exactly the same as in the synchronous module:
from gcouchbase.bucket import Bucket
from couchbase.n1ql import N1QLQuery
cb = Bucket('couchbase://localhost/travel-sample')
it = cb.n1ql_query(N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
for row in it:
print row
Note that if you inspect the returned iterator object, it is actually a different object than the synchronous iterator:
print it
# <gcouchbase.bucket.GN1QLRequest object at 0x103694d50>
Gevent MapReduce Queries
Issuing MapReduce queries with gcouchbase is exactly the same as in the synchronous module
from gcouchbase.bucket import Bucket
cb = Bucket('couchbase://localhost/beer-sample')
resiter = cb.query('beer', 'brewery_beers', limit=10)
for row in resiter:
print row
Asyncio (Python 3.5+)
asyncio is another asynchronous module which ships with Python version 3.5 and later.
The asyncio module is supported in Couchbase via the experimental acouchbase
module.
The acouchbase.bucket.Bucket
object returns asyncio.Future
objects rather than actual results.
To use a bucket from acouchbase
, you need to enable experimental SDK features.
This is essentially an explicit disclaimer which is visible in production code to ensure that potentially unstable code does not accidentally get used in production.
Asyncio Document (KV) Operations
CRUD operations return Future
objects which can then be used for await
clauses.
The future’s result will always be the relevant Result
object for the operation performed.
import asyncio
import couchbase.experimental; couchbase.experimental.enable()
from acouchbase.bucket import Bucket
async def do_crud_op():
cb = Bucket('couchbase://localhost/default')
await cb.connect()
await cb.upsert('id', {'some': 'value'})
return await cb.get('id')
loop = asyncio.get_event_loop()
rv = loop.run_until_complete(do_crud_op())
print(rv.value)
Asyncio N1QL Queries
The API for issuing N1QL queries is almost identical to the synchronous API.
The notable difference is the use of async for
rather than for
when iterating over the results:
async def do_n1ql_query(cb):
it = cb.n1ql_query(N1QLQuery('SELECT * from `travel-sample` LIMIT 10'))
async for row in it:
print(row)
Asyncio MapReduce Queries
The API for issuing MapReduce queries is almost identical to the synchronous API.
The notable difference is the use of async for
rather than for
when iterating over the results:
async def do_view_query(cb):
it = cb.query('beer', 'brewery_beers', limit=20, include_docs=True)
async for row in it:
print(row.key, row.doc.value['description'][:10] + '...')