kinto-http 11.2.0

Creator: bradpython12

Last updated:

Add to Cart


kintohttp 11.2.0

kinto-http is the Python library to interact with a Kinto server.
There is also a similar client in JavaScript.

Use pip:
$ pip install kinto-http

Here is an overview of what the API provides:
import kinto_http

client = kinto_http.Client(server_url="http://localhost:8888/v1",
auth=('alexis', 'p4ssw0rd'))

records = client.get_records(bucket='default', collection='todos')
for i, record in enumerate(records):
record['title'] = 'Todo {}'.format(i)

Instantiating a client
The passed auth parameter is a requests
authentication policy.
By default, a simple tuple will become a Basic Auth authorization request header, that can authenticate users with Kinto Accounts.
import kinto_http

auth = ('alexis', 'p4ssw0rd')

client = kinto_http.Client(server_url='http://localhost:8888/v1',
It is also possible to pass a bucket ID and/or collection ID to set them as default values for the parameters of the client operations.
client = Client(bucket="payments", collection="receipts", auth=auth)
After creating a client, you can also replicate an existing one and overwrite
some key arguments.
client2 = client.clone(collection="orders")
An asynchronous client is also available. It has all the same endpoints as the sync client except for the batch operations.
from kinto_http import AsyncClient

auth = ('alexis', 'p4ssw0rd')

client = AsyncClient(server_url='http://localhost:8888/v1', auth=auth)
info = await client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."

Using a Bearer access token to authenticate (OpenID)
import kinto_http

client = kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2"))
The authorization header is prefixed with Bearer by default. If the header_type
is customized on the server,
the client must specify the expected type: kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC")

Passing a string containing Bearer will be instantiate a kinto_http.BearerTokenAuth() object automatically.
In other words, kinto_http.Client(auth="Bearer+OIDC XYPJTNsFKV2") is equivalent to kinto_http.Client(auth=kinto_http.BearerTokenAuth("XYPJTNsFKV2", type="Bearer+OIDC"))

Custom headers
Custom headers can be specified in the Client constructor, and will be sent in every request:
import kinto_http

client = kinto_http.Client(server_url="http://server/v1", headers={
"Allow-Access": "CDN",
"User-Agent": "blocklist-updater"

Getting server information
You can use the server_info() method to fetch the server information:
from kinto_http import Client

client = Client(server_url='http://localhost:8888/v1')
info = client.server_info()
assert 'schema' in info['capabilities'], "Server doesn't support schema validation."

Bucket operations

get_bucket(id=None, **kwargs): retrieve single bucket
get_buckets(**kwargs): retrieve all readable buckets
create_bucket(id=None, data=None, **kwargs): create a bucket
update_bucket(id=None, data=None, **kwargs): create or replace an existing bucket
patch_bucket(id=None, changes=None, **kwargs): modify some fields in an existing bucket
delete_bucket(id=None, **kwargs): delete a bucket and everything under it
delete_buckets(**kwargs): delete every writable buckets

Groups operations

get_group(id=None, bucket=None, **kwargs): retrieve single group
get_groups(bucket=None, **kwargs): retrieve all readable groups
create_group(id=None, data=None, bucket=None, **kwargs): create a group
update_group(id=None, data=None, bucket=None, **kwargs): create or replace an existing group
patch_group(id=None, changes=None, bucket=None, **kwargs): modify some fields in an existing group
delete_group(id=None, bucket=None, **kwargs): delete a group and everything under it
delete_groups(bucket=None, **kwargs): delete every writable groups


get_collection(id=None, bucket=None, **kwargs): retrieve single collection
get_collections(bucket=None, **kwargs): retrieve all readable collections
create_collection(id=None, data=None, bucket=None, **kwargs): create a collection
update_collection(id=None, data=None, bucket=None, **kwargs): create or replace an existing collection
patch_collection(id=None, changes=None, bucket=None, **kwargs): modify some fields in an existing collection
delete_collection(id=None, bucket=None, **kwargs): delete a collection and everything under it
delete_collections(bucket=None, **kwargs): delete every writable collections


get_record(id=None, bucket=None, collection=None, **kwargs): retrieve single record
get_records(bucket=None, collection=None, **kwargs): retrieve all readable records
get_paginated_records(bucket=None, collection=None, **kwargs): paginated list of records
get_records_timestamp(bucket=None, collection=None, **kwargs): return the records timestamp of this collection
create_record(id=None, data=None, bucket=None, collection=None, **kwargs): create a record
update_record(id=None, data=None, bucket=None, collection=None, **kwargs): create or replace an existing record
patch_record(id=None, changes=None, bucket=None, collection=None, **kwargs): modify some fields in an existing record
delete_record(id=None, bucket=None, collection=None, **kwargs): delete a record and everything under it
delete_records(bucket=None, collection=None, **kwargs): delete every writable records

The objects permissions can be specified or modified by passing a permissions to create_*(), patch_*(), or update_*() methods:
client.create_record(data={'foo': 'bar'},
permissions={'read': ['group:groupid']})

record = client.get_record('123', collection='todos', bucket='alexis')

Get or create
In some cases, you might want to create a bucket, collection, group or record only if
it doesn’t exist already. To do so, you can pass the if_not_exists=True
to the create_*() methods:
client.create_bucket(id='blog', if_not_exists=True)
client.create_collection(id='articles', bucket='blog', if_not_exists=True)

Delete if exists
In some cases, you might want to delete a bucket, collection, group or record only if
it exists already. To do so, you can pass the if_exists=True
to the delete_* methods:
client.delete_bucket(id='bucket', if_exists=True)

Patch operations
The .patch_*() operations receive a changes parameter.
from kinto_http.patch_type import BasicPatch, MergePatch, JSONPatch

client.patch_record(id='abc', changes=BasicPatch({'over': 'write'}))

client.patch_record(id='todo', changes=MergePatch({'assignee': 'bob'}))

client.patch_record(id='receipts', changes=JSONPatch([
{'op': 'add', 'path': '/data/members/0', 'value': ''}

Concurrency control
The create_*(), patch_*(), and update_*() methods take a safe argument (default: True).
If True, the client will ensure that the object doesn’t exist already for creations, or wasn’t modified on the server side since we fetched it. The timestamp will be implicitly read from the last_modified field in the passed data object, or taken explicitly from the if_match parameter.

Batching operations
Rather than issuing a request for each and every operation, it is possible to
batch several operations in one request (sync client only).
Using the batch() method as a Python context manager (with):
with client.batch() as batch:
for idx in range(0, 100):
batch.update_record(data={'id': idx})

Besides the results() method, a batch object shares all the same methods as
another client.

Reading data from batch operations is achieved by using the results() method
available after a batch context is closed.
with client.batch() as batch:

r1, r2, r3 = batch.results()

Failing operations will raise a KintoException, which has request and response attributes.
except kinto_http.KintoException as e:
if e.response and e.response.status_code == 403:
print("Not allowed!")

Requests Timeout
A timeout value in seconds can be specified in the client constructor:
client = KintoClient(server_url="...", timeout=5)
To distinguish the connect from the read timeout, use a tuple:
client = KintoClient(server_url="...", timeout=(3.05, 27))
For an infinit timeout, use None:
client = KintoClient(server_url="...", timeout=None)
See the timeout documentation of the underlying requests library.

Retry on error
When the server is throttled (under heavy load or maintenance) it can
return error responses.
The client can hence retry to send the same request until it succeeds.
To enable this, specify the number of retries on the client:
client = Client(server_url='http://localhost:8888/v1',
The Kinto protocol lets the server define the duration in seconds between retries.
It is possible (but not recommended) to force this value in the clients:
client = Client(server_url='http://localhost:8888/v1',

When the server responses are paginated, the client will download every page and
merge them transparently.
The get_paginated_records() method returns a generator that will yield each page:
for page in client.get_paginated_records():
records = page["data"]
It is possible to specify a limit for the number of items to be retrieved in one page:
records = client.get_records(_limit=10)
In order to retrieve every available pages with a limited number of items in each
of them, you can specify the number of pages:
records = client.get_records(_limit=10, pages=float('inf')) # Infinity

If the built-in history plugin is enabled, it is possible to retrieve the history of changes:
# Get the complete history of a bucket
changes = client.get_history(bucket='default')

# and optionally use filters
hist = client.get_history(bucket='default', _limit=2, _sort='-last_modified', _since='1533762576015')
hist = client.get_history(bucket='default', resource_name='collection')
The history of a bucket can also be purged with:

Endpoint URLs
The get_endpoint() method returns an endpoint URL on the server:
client = Client(server_url='http://localhost:8888/v1',
auth=('token', 'your-token'),


# '/buckets/payments/collections/receipts/records/c6894b2c-1856-11e6-9415-3c970ede22b0'

Handling datetime and date objects
In addition to the data types supported by JSON, also
supports native Python date and datetime objects.
In case a payload contain a date or a datetime object,
will encode it as an ISO formatted string.
Please note that this transformation is only one-way. While reading a
record, if a string contains a ISO formated string, will
not convert it to a native Python date or datetime object.
If you know that a field will be a datetime, you might consider
encoding it yourself to be more explicit about it being a string for

Command-line scripts
In order to have common arguments and options for scripts, some utilities are provided
to ease configuration and initialization of client from command-line arguments.
import argparse
import logging

from kinto_http import cli_utils

logger = logging.getLogger(__name__)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download records")

args = parser.parse_args()

cli_utils.setup_logger(logger, args)

logger.debug("Instantiate Kinto client.")
client = cli_utils.create_client_from_args(args)"Fetch records.")
records = client.get_records()
logger.warn("{} records.".format(len(records)))
The script now accepts basic options:
$ python --help

usage: [-h] [-s SERVER] [-a AUTH] [-b BUCKET] [-c COLLECTION] [-v]
[-q] [-D]

Download records

optional arguments:
-h, --help show this help message and exit
-s SERVER, --server SERVER
The location of the remote server (with prefix)
-a AUTH, --auth AUTH BasicAuth credentials: `token:my-secret` or
Authorization header: `Bearer token`
-b BUCKET, --bucket BUCKET
Bucket name.
Collection name.
--retry RETRY Number of retries when a request fails
--retry-after RETRY_AFTER
Delay in seconds between retries when requests fail
(default: provided by server)
-v, --verbose Show all messages.
-q, --quiet Show only critical errors.
-D, --debug Show all messages, including debug messages.

See contributing docs


For personal and professional use. You cannot resell or redistribute these repositories in their original state.

Customer Reviews

There are no reviews.