Skip to content

Preserves Schema

A Preserves schema connects Preserves Values to host-language data structures. Each definition within a schema can be processed by a compiler to produce

  • a simple host-language type definition;

  • a partial parsing function from Values to instances of the produced type; and

  • a total serialization function from instances of the type to Values.

Every parsed Value retains enough information to always be able to be serialized again, and every instance of a host-language data structure contains, by construction, enough information to be successfully serialized.

Schema support in Python

The preserves.schema module implements Preserves Schema for Python.

A Schema source file (like this one) is first compiled using preserves-schemac to produce a binary-syntax schema bundle containing schema module definitons (like this one). Python code then loads the bundle, exposing its contents as Namespaces ultimately containing SchemaObjects.

Examples

Setup: Loading a schema bundle

For our running example, we will use schemas associated with the Syndicated Actor Model. (The schema bundle is a copy of this file from the syndicate-protocols repository.)

To load a schema bundle, use load_schema_file (or, alternatively, use Compiler directly):

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')
>>> type(bundle)
<class 'preserves.schema.Namespace'>

The top-level entries in the loaded bundle are schema modules. Let's examine the stream schema module, whose source code indicates that it should contain definitions for Mode, Source, Sink, etc.:

>>> bundle.stream                                           # doctest: +ELLIPSIS
{'Mode': <class 'stream.Mode'>, 'Sink': <class 'stream.Sink'>, ...}

Example 1: stream.StreamListenerError, a product type

Drilling down further, let's consider the definition of StreamListenerError, which appears in the source as

StreamListenerError = <stream-listener-error @spec any @message string> .

This reads, in the Preserves Schema language, as the definition of a simple product type (record, class, object) with two named fields spec and message. Parsing a value into a StreamListenerError will only succeed if it's a record, if the label matches, the second field (message) is a string, and it has exactly two fields.

>>> bundle.stream.StreamListenerError
<class 'stream.StreamListenerError'>

The StreamListenerError class includes a decode method that analyzes an input value:

>>> bundle.stream.StreamListenerError.decode(
...     parse('<stream-listener-error <xyz> "an error">'))
StreamListenerError {'spec': #xyz(), 'message': 'an error'}

If invalid input is supplied, decode will raise SchemaDecodeFailed, which includes helpful information for diagnosing the problem (as we will see below, this is especially useful for parsers for sum types):

>>> bundle.stream.StreamListenerError.decode(
...     parse('<i-am-invalid>'))
Traceback (most recent call last):
  ...
preserves.schema.SchemaDecodeFailed: Could not decode i-am-invalid using <class 'stream.StreamListenerError'>
Most likely reason: in stream.StreamListenerError: <lit stream-listener-error> didn't match i-am-invalid
Full explanation: 
  in stream.StreamListenerError: <lit stream-listener-error> didn't match i-am-invalid

Alternatively, the try_decode method catches SchemaDecodeFailed, transforming it into None:

>>> bundle.stream.StreamListenerError.try_decode(
...     parse('<stream-listener-error <xyz> "an error">'))
StreamListenerError {'spec': #xyz(), 'message': 'an error'}
>>> bundle.stream.StreamListenerError.try_decode(
...     parse('<i-am-invalid>'))

The class can also be instantiated directly:

>>> err = bundle.stream.StreamListenerError(Record(Symbol('xyz'), []), 'an error')
>>> err
StreamListenerError {'spec': #xyz(), 'message': 'an error'}

The fields and contents of instances can be queried:

>>> err.spec
#xyz()
>>> err.message
'an error'

And finally, instances can of course be serialized and encoded:

>>> print(stringify(err))
<stream-listener-error <xyz> "an error">
>>> canonicalize(err)
b'\xb4\xb3\x15stream-listener-error\xb4\xb3\x03xyz\x84\xb1\x08an error\x84'

Example 2: stream.Mode, a sum type

Now let's consider the definition of Mode, which appears in the source as

Mode = =bytes / @lines LineMode / <packet @size int> / <object @description any> .

This reads, in the Preserves Schema language, as an alternation (disjoint union, variant, sum type) of four possible kinds of value: the symbol bytes; a LineMode value; a record with packet as its label and an integer as its only field; or a record with object as its label and any kind of value as its only field. In Python, this becomes:

>>> bundle.stream.Mode.bytes
<class 'stream.Mode.bytes'>
>>> bundle.stream.Mode.lines
<class 'stream.Mode.lines'>
>>> bundle.stream.Mode.packet
<class 'stream.Mode.packet'>
>>> bundle.stream.Mode.object
<class 'stream.Mode.object'>

As before, Mode includes a decode method that analyzes an input value:

>>> bundle.stream.Mode.decode(parse('bytes'))
Mode.bytes()
>>> bundle.stream.Mode.decode(parse('lf'))
Mode.lines(LineMode.lf())
>>> bundle.stream.Mode.decode(parse('<packet 123>'))
Mode.packet {'size': 123}
>>> bundle.stream.Mode.decode(parse('<object "?">'))
Mode.object {'description': '?'}

Invalid input causes SchemaDecodeFailed to be raised:

>>> bundle.stream.Mode.decode(parse('<i-am-not-a-valid-mode>'))
Traceback (most recent call last):
  ...
preserves.schema.SchemaDecodeFailed: Could not decode <i-am-not-a-valid-mode> using <class 'stream.Mode'>
Most likely reason: in stream.LineMode.crlf: <lit crlf> didn't match <i-am-not-a-valid-mode>
Full explanation: 
  in stream.Mode: matching <i-am-not-a-valid-mode>
    in stream.Mode.bytes: <lit bytes> didn't match <i-am-not-a-valid-mode>
    in stream.Mode.lines: <ref [] LineMode> didn't match <i-am-not-a-valid-mode>
      in stream.LineMode: matching <i-am-not-a-valid-mode>
        in stream.LineMode.lf: <lit lf> didn't match <i-am-not-a-valid-mode>
        in stream.LineMode.crlf: <lit crlf> didn't match <i-am-not-a-valid-mode>
    in stream.Mode.packet: <lit packet> didn't match i-am-not-a-valid-mode
    in stream.Mode.object: <lit object> didn't match i-am-not-a-valid-mode

The "full explanation" includes details on which parses were attempted, and why they failed.

Again, the try_decode method catches SchemaDecodeFailed, transforming it into None:

>>> bundle.stream.Mode.try_decode(parse('bytes'))
Mode.bytes()
>>> bundle.stream.Mode.try_decode(parse('<i-am-not-a-valid-mode>'))

Direct instantiation is done with the variant classes, not with Mode itself:

>>> bundle.stream.Mode.bytes()
Mode.bytes()
>>> bundle.stream.Mode.lines(bundle.stream.LineMode.lf())
Mode.lines(LineMode.lf())
>>> bundle.stream.Mode.packet(123)
Mode.packet {'size': 123}
>>> bundle.stream.Mode.object('?')
Mode.object {'description': '?'}

Fields and contents can be queried as usual:

>>> bundle.stream.Mode.lines(bundle.stream.LineMode.lf()).value
LineMode.lf()
>>> bundle.stream.Mode.packet(123).size
123
>>> bundle.stream.Mode.object('?').description
'?'

And serialization and encoding are also as expected:

>>> print(stringify(bundle.stream.Mode.bytes()))
bytes
>>> print(stringify(bundle.stream.Mode.lines(bundle.stream.LineMode.lf())))
lf
>>> print(stringify(bundle.stream.Mode.packet(123)))
<packet 123>
>>> print(stringify(bundle.stream.Mode.object('?')))
<object "?">
>>> canonicalize(bundle.stream.Mode.object('?'))
b'\xb4\xb3\x06object\xb1\x01?\x84'

Finally, the VARIANT attribute of instances allows code to dispatch on what kind of data it is handling at a given moment:

>>> bundle.stream.Mode.bytes().VARIANT
#bytes
>>> bundle.stream.Mode.lines(bundle.stream.LineMode.lf()).VARIANT
#lines
>>> bundle.stream.Mode.packet(123).VARIANT
#packet
>>> bundle.stream.Mode.object('?').VARIANT
#object

dumps = stringify module-attribute

This alias for stringify provides a familiar pythonesque name for converting a Preserves Value to a string.

loads = parse module-attribute

This alias for parse provides a familiar pythonesque name for converting a string to a Preserves Value.

meta = load_schema_file(__metaschema_filename).schema module-attribute

Schema module Namespace corresponding to Preserves Schema's metaschema.

Annotated(item)

Bases: object

A Preserves Value along with a sequence of Values annotating it. Compares equal to the underlying Value, ignoring the annotations. See the specification document for more about annotations.

>>> import preserves
>>> a = preserves.parse('''
... # A comment
... [1 2 3]
... ''', include_annotations=True)
>>> a
@'A comment' (1, 2, 3)
>>> a.item
(1, 2, 3)
>>> a.annotations
['A comment']
>>> a == (1, 2, 3)
True
>>> a == preserves.parse('@xyz [1 2 3]', include_annotations=True)
True
>>> a[0]
Traceback (most recent call last):
  ...
TypeError: 'Annotated' object is not subscriptable
>>> a.item[0]
1
>>> type(a.item[0])
<class 'preserves.values.Annotated'>
>>> a.item[0].annotations
[]
>>> print(preserves.stringify(a))
@"A comment" [1 2 3]
>>> print(preserves.stringify(a, include_annotations=False))
[1 2 3]

Attributes:

Name Type Description
item Value

the underlying annotated Value

annotations list[Value]

the annotations attached to self.item

Source code in preserves/values.py
456
457
458
def __init__(self, item):
    self.annotations = []
    self.item = item

peel()

Calls strip_annotations on self with depth=1.

Source code in preserves/values.py
479
480
481
def peel(self):
    """Calls [strip_annotations][preserves.values.strip_annotations] on `self` with `depth=1`."""
    return strip_annotations(self, 1)

strip(depth=inf)

Calls strip_annotations on self and depth.

Source code in preserves/values.py
475
476
477
def strip(self, depth=inf):
    """Calls [strip_annotations][preserves.values.strip_annotations] on `self` and `depth`."""
    return strip_annotations(self, depth)

Compiler()

Instances of Compiler populate an initially-empty Namespace by loading and compiling schema bundle files.

>>> c = Compiler()
>>> c.load('docs/syndicate-protocols-schema-bundle.bin')
>>> type(c.root)
<class 'preserves.schema.Namespace'>

Attributes:

Name Type Description
root Namespace

the root namespace into which top-level schema modules are installed.

Source code in preserves/schema.py
910
911
def __init__(self):
    self.root = Namespace(())

load(filename)

Opens the file at filename, passing the resulting file object to load_filelike.

Source code in preserves/schema.py
934
935
936
937
938
939
def load(self, filename):
    """Opens the file at `filename`, passing the resulting file object to
    [load_filelike][preserves.schema.Compiler.load_filelike]."""
    filename = pathlib.Path(filename)
    with open(filename, 'rb') as f:
        self.load_filelike(f, filename.stem)

load_filelike(f, module_name=None)

Reads a meta.Bundle or meta.Schema from the filelike object f, compiling and installing it in self.root. If f contains a bundle, module_name is not used, since the schema modules in the bundle know their own names; if f contains a plain schema module, however, module_name is used directly if it is a string, and if it is None, a suitable module name is computed from the name attribute of f, if it is present. If name is absent in that case, ValueError is raised.

Source code in preserves/schema.py
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
def load_filelike(self, f, module_name=None):
    """Reads a `meta.Bundle` or `meta.Schema` from the filelike object `f`, compiling and
    installing it in `self.root`. If `f` contains a bundle, `module_name` is not used,
    since the schema modules in the bundle know their own names; if `f` contains a plain
    schema module, however, `module_name` is used directly if it is a string, and if it is
    `None`, a suitable module name is computed from the `name` attribute of `f`, if it is
    present. If `name` is absent in that case, `ValueError` is raised.

    """
    x = Decoder(f.read()).next()
    if x.key == SCHEMA:
        if module_name is None:
            if hasattr(f, 'name'):
                module_name = pathlib.Path(f.name).stem
            else:
                raise ValueError('Cannot load schema module from filelike object without a module_name')
        self.load_schema((Symbol(module_name),), x)
    elif x.key == BUNDLE:
        for (p, s) in x[0].items():
            self.load_schema(p, s)

DecodeError

Bases: ValueError

Raised whenever preserves.binary.Decoder or preserves.text.Parser detect invalid input.

Decoder(packet=b'', include_annotations=False, decode_embedded=lambda x: x)

Bases: BinaryCodec

Implementation of a decoder for the machine-oriented binary Preserves syntax.

Parameters:

Name Type Description Default
packet bytes

initial contents of the input buffer; may subsequently be extended by calling extend.

b''
include_annotations bool

if True, wrap each value and subvalue in an Annotated object.

False
decode_embedded

function accepting a Value and returning a possibly-decoded form of that value suitable for placing into an Embedded object.

lambda x: x

Normal usage is to supply a buffer, and keep calling next until a ShortPacket exception is raised:

>>> d = Decoder(b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84')
>>> d.next()
123
>>> d.next()
'hello'
>>> d.next()
()
>>> d.next()
Traceback (most recent call last):
  ...
preserves.error.ShortPacket: Short packet

Alternatively, keep calling try_next until it yields None, which is not in the domain of Preserves Values:

>>> d = Decoder(b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84')
>>> d.try_next()
123
>>> d.try_next()
'hello'
>>> d.try_next()
()
>>> d.try_next()

For convenience, Decoder implements the iterator interface, backing it with try_next, so you can simply iterate over all complete values in an input:

>>> d = Decoder(b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84')
>>> list(d)
[123, 'hello', ()]
>>> for v in Decoder(b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84'):
...     print(repr(v))
123
'hello'
()

Supply include_annotations=True to read annotations alongside the annotated values:

>>> d = Decoder(b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84', include_annotations=True)
>>> list(d)
[123, 'hello', @#x ()]

If you are incrementally reading from, say, a socket, you can use extend to add new input as if comes available:

>>> d = Decoder(b'\xb0\x01{\xb1\x05he')
>>> d.try_next()
123
>>> d.try_next() # returns None because the input is incomplete
>>> d.extend(b'llo')
>>> d.try_next()
'hello'
>>> d.try_next()

Attributes:

Name Type Description
packet bytes

buffered input waiting to be processed

index int

read position within packet

Source code in preserves/binary.py
127
128
129
130
131
132
def __init__(self, packet=b'', include_annotations=False, decode_embedded=lambda x: x):
    super(Decoder, self).__init__()
    self.packet = packet
    self.index = 0
    self.include_annotations = include_annotations
    self.decode_embedded = decode_embedded

extend(data)

Appends data to the remaining bytes in self.packet, trimming already-processed bytes from the front of self.packet and resetting self.index to zero.

Source code in preserves/binary.py
134
135
136
137
138
def extend(self, data):
    """Appends `data` to the remaining bytes in `self.packet`, trimming already-processed
    bytes from the front of `self.packet` and resetting `self.index` to zero."""
    self.packet = self.packet[self.index:] + data
    self.index = 0

next()

Reads the next complete Value from the internal buffer, raising ShortPacket if too few bytes are available, or DecodeError if the input is invalid somehow.

Source code in preserves/binary.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def next(self):
    """Reads the next complete `Value` from the internal buffer, raising
    [ShortPacket][preserves.error.ShortPacket] if too few bytes are available, or
    [DecodeError][preserves.error.DecodeError] if the input is invalid somehow.

    """
    tag = self.nextbyte()
    if tag == 0x80: return self.wrap(False)
    if tag == 0x81: return self.wrap(True)
    if tag == 0x84: raise DecodeError('Unexpected end-of-stream marker')
    if tag == 0x85:
        a = self.next()
        v = self.next()
        return self.unshift_annotation(a, v)
    if tag == 0x86:
        if self.decode_embedded is None:
            raise DecodeError('No decode_embedded function supplied')
        return self.wrap(Embedded(self.decode_embedded(self.next())))
    if tag == 0x87:
        count = self.nextbyte()
        if count == 8: return self.wrap(struct.unpack('>d', self.nextbytes(8))[0])
        raise DecodeError('Invalid IEEE754 size')
    if tag == 0xb0: return self.wrap(self.nextint(self.varint()))
    if tag == 0xb1: return self.wrap(self.nextbytes(self.varint()).decode('utf-8'))
    if tag == 0xb2: return self.wrap(self.nextbytes(self.varint()))
    if tag == 0xb3: return self.wrap(Symbol(self.nextbytes(self.varint()).decode('utf-8')))
    if tag == 0xb4:
        vs = self.nextvalues()
        if not vs: raise DecodeError('Too few elements in encoded record')
        return self.wrap(Record(vs[0], vs[1:]))
    if tag == 0xb5: return self.wrap(tuple(self.nextvalues()))
    if tag == 0xb6:
        vs = self.nextvalues()
        s = frozenset(vs)
        if len(s) != len(vs): raise DecodeError('Duplicate value')
        return self.wrap(s)
    if tag == 0xb7: return self.wrap(ImmutableDict.from_kvs(self.nextvalues()))
    raise DecodeError('Invalid tag: ' + hex(tag))

try_next()

Like next, but returns None instead of raising ShortPacket.

Source code in preserves/binary.py
228
229
230
231
232
233
234
235
236
def try_next(self):
    """Like [next][preserves.binary.Decoder.next], but returns `None` instead of raising
    [ShortPacket][preserves.error.ShortPacket]."""
    start = self.index
    try:
        return self.next()
    except ShortPacket:
        self.index = start
        return None

Definition(*args, **kwargs)

Bases: SchemaObject

Subclasses of Definition are used to represent both standalone non-alternation definitions as well as alternatives within an Enumeration.

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')

>>> bundle.stream.StreamListenerError.FIELD_NAMES
['spec', 'message']
>>> bundle.stream.StreamListenerError.SAFE_FIELD_NAMES
['spec', 'message']
>>> bundle.stream.StreamListenerError.ENUMERATION is None
True

>>> bundle.stream.Mode.object.FIELD_NAMES
['description']
>>> bundle.stream.Mode.object.SAFE_FIELD_NAMES
['description']
>>> bundle.stream.Mode.object.ENUMERATION is bundle.stream.Mode
True

>>> bundle.stream.CreditAmount.count.FIELD_NAMES
[]
>>> bundle.stream.CreditAmount.count.SAFE_FIELD_NAMES
[]
>>> bundle.stream.CreditAmount.count.ENUMERATION is bundle.stream.CreditAmount
True

>>> bundle.stream.CreditAmount.decode(parse('123'))
CreditAmount.count(123)
>>> bundle.stream.CreditAmount.count(123)
CreditAmount.count(123)
>>> bundle.stream.CreditAmount.count(123).value
123
Source code in preserves/schema.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
def __init__(self, *args, **kwargs):
    self._fields = args
    if self.SIMPLE:
        if self.EMPTY:
            if len(args) != 0:
                raise TypeError('%s takes no arguments' % (self._constructor_name(),))
        else:
            if len(args) != 1:
                raise TypeError('%s needs exactly one argument' % (self._constructor_name(),))
            self.value = args[0]
    else:
        i = 0
        for arg in args:
            if i >= len(self.FIELD_NAMES):
                raise TypeError('%s given too many positional arguments' % (self._constructor_name(),))
            setattr(self, self.SAFE_FIELD_NAMES[i], arg)
            i = i + 1
        for (argname, arg) in kwargs.items():
            if hasattr(self, argname):
                raise TypeError('%s given duplicate attribute: %r' % (self._constructor_name, argname))
            if argname not in self.SAFE_FIELD_NAMES:
                raise TypeError('%s given unknown attribute: %r' % (self._constructor_name, argname))
            setattr(self, argname, arg)
            i = i + 1
        if i != len(self.FIELD_NAMES):
            raise TypeError('%s needs argument(s) %r' % (self._constructor_name(), self.FIELD_NAMES))

ENUMERATION = None class-attribute instance-attribute

None for standalone top-level definitions with a module; otherwise, an Enumeration subclass representing a top-level alternation definition.

FIELD_NAMES = [] class-attribute instance-attribute

List of strings: names of the fields contained within this definition, if it has named fields at all; otherwise, an empty list, and the definition is a simple wrapper for another value, in which case that value is accessed via the value attribute.

SAFE_FIELD_NAMES = [] class-attribute instance-attribute

The list produced by mapping safeattrname over FIELD_NAMES.

Embedded(embeddedValue)

Representation of a Preserves Embedded value. For more on the meaning and use of embedded values, see the specification.

>>> import io
>>> e = Embedded(io.StringIO('some text'))
>>> e                                        # doctest: +ELLIPSIS
#:<_io.StringIO object at ...>
>>> e.embeddedValue                          # doctest: +ELLIPSIS
<_io.StringIO object at ...>
>>> import preserves
>>> print(preserves.stringify(Embedded(None)))
Traceback (most recent call last):
  ...
TypeError: Cannot preserves-format: None
>>> print(preserves.stringify(Embedded(None), format_embedded=lambda x: 'abcdef'))
#:"abcdef"

Attributes:

Name Type Description
embeddedValue

any Python value; could be a platform object, could be a representation of a Preserves Value, could be None, could be anything!

Source code in preserves/values.py
601
602
def __init__(self, embeddedValue):
    self.embeddedValue = embeddedValue

EncodeError

Bases: ValueError

Raised whenever preserves.binary.Encoder or preserves.text.Formatter are unable to proceed.

Encoder(encode_embedded=lambda x: x, canonicalize=False, include_annotations=None)

Bases: BinaryCodec

Implementation of an encoder for the machine-oriented binary Preserves syntax.

>>> e = Encoder()
>>> e.append(123)
>>> e.append('hello')
>>> e.append(annotate([], Symbol('x')))
>>> e.contents()
b'\xb0\x01{\xb1\x05hello\x85\xb3\x01x\xb5\x84'

Parameters:

Name Type Description Default
encode_embedded

function accepting an Embedded.embeddedValue and returning a Value for serialization.

lambda x: x
canonicalize bool

if True, ensures the serialized data are in canonical form. This is slightly more work than producing potentially-non-canonical output.

False
include_annotations bool | None

if None, includes annotations in the output only when canonicalize is False, because canonical serialization of values demands omission of annotations. If explicitly True or False, however, annotations will be included resp. excluded no matter the canonicalize setting. This can be used to get canonical ordering (canonicalize=True) and annotations (include_annotations=True).

None

Attributes:

Name Type Description
buffer bytearray

accumulator for the output of the encoder

Source code in preserves/binary.py
298
299
300
301
302
303
304
305
306
307
308
309
def __init__(self,
             encode_embedded=lambda x: x,
             canonicalize=False,
             include_annotations=None):
    super(Encoder, self).__init__()
    self.buffer = bytearray()
    self._encode_embedded = encode_embedded
    self._canonicalize = canonicalize
    if include_annotations is None:
        self.include_annotations = not self._canonicalize
    else:
        self.include_annotations = include_annotations

append(v)

Extend self.buffer with an encoding of v.

Source code in preserves/binary.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def append(self, v):
    """Extend `self.buffer` with an encoding of `v`."""
    v = preserve(v)
    if hasattr(v, '__preserve_write_binary__'):
        v.__preserve_write_binary__(self)
    elif v is False:
        self.buffer.append(0x80)
    elif v is True:
        self.buffer.append(0x81)
    elif isinstance(v, float):
        self.buffer.append(0x87)
        self.buffer.append(8)
        self.buffer.extend(struct.pack('>d', v))
    elif isinstance(v, numbers.Number):
        self.encodeint(v)
    elif isinstance(v, bytes):
        self.encodebytes(0xb2, v)
    elif isinstance(v, basestring_):
        self.encodebytes(0xb1, v.encode('utf-8'))
    elif isinstance(v, list):
        self.encodevalues(0xb5, v)
    elif isinstance(v, tuple):
        self.encodevalues(0xb5, v)
    elif isinstance(v, set):
        self.encodeset(v)
    elif isinstance(v, frozenset):
        self.encodeset(v)
    elif isinstance(v, dict):
        self.encodedict(v)
    else:
        try:
            i = iter(v)
        except TypeError:
            i = None
        if i is None:
            self.cannot_encode(v)
        else:
            self.encodevalues(0xb5, i)

contents()

Returns a bytes constructed from the contents of self.buffer.

Source code in preserves/binary.py
320
321
322
def contents(self):
    """Returns a `bytes` constructed from the contents of `self.buffer`."""
    return bytes(self.buffer)

reset()

Clears self.buffer to a fresh empty bytearray.

Source code in preserves/binary.py
311
312
313
def reset(self):
    """Clears `self.buffer` to a fresh empty `bytearray`."""
    self.buffer = bytearray()

Enumeration()

Bases: SchemaObject

Subclasses of Enumeration represent a group of variant options within a sum type.

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')

>>> import pprint
>>> pprint.pprint(bundle.stream.Mode.VARIANTS)
[(#bytes, <class 'stream.Mode.bytes'>),
 (#lines, <class 'stream.Mode.lines'>),
 (#packet, <class 'stream.Mode.packet'>),
 (#object, <class 'stream.Mode.object'>)]

>>> bundle.stream.Mode.VARIANTS[0][1] is bundle.stream.Mode.bytes
True
Source code in preserves/schema.py
579
580
def __init__(self):
    raise TypeError('Cannot create instance of Enumeration')

VARIANTS = None class-attribute instance-attribute

List of (Symbol, SchemaObject class) tuples representing the possible options within this sum type.

Formatter(format_embedded=lambda x: x, indent=None, with_commas=False, trailing_comma=False, include_annotations=True)

Bases: TextCodec

Printer (and indenting pretty-printer) for producing human-readable syntax from Preserves Values.

>>> f = Formatter()
>>> f.append({'a': 1, 'b': 2})
>>> f.append(Record(Symbol('label'), ['field1', ['field2item1', 'field2item2']]))
>>> print(f.contents())
{"a": 1 "b": 2} <label "field1" ["field2item1" "field2item2"]>

>>> f = Formatter(indent=4)
>>> f.append({'a': 1, 'b': 2})
>>> f.append(Record(Symbol('label'), ['field1', ['field2item1', 'field2item2']]))
>>> print(f.contents())
{
    "a": 1
    "b": 2
}
<label "field1" [
    "field2item1"
    "field2item2"
]>

Parameters:

Name Type Description Default
format_embedded

function accepting an Embedded.embeddedValue and returning a Value for serialization.

lambda x: x
indent int | None

None disables indented pretty-printing; otherwise, an int specifies indentation per nesting-level.

None
with_commas bool

True causes commas to separate sequence and set items and dictionary entries; False omits commas.

False
trailing_comma bool

True causes a comma to be printed after the final item or entry in a sequence, set or dictionary; False omits this trailing comma

False
include_annotations bool

True causes annotations to be included in the output; False causes them to be omitted.

True

Attributes:

Name Type Description
indent_delta int

indentation per nesting-level

chunks list[str]

fragments of output

Source code in preserves/text.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
def __init__(self,
             format_embedded=lambda x: x,
             indent=None,
             with_commas=False,
             trailing_comma=False,
             include_annotations=True):
    super(Formatter, self).__init__()
    self.indent_delta = 0 if indent is None else indent
    self.indent_distance = 0
    self.nesting = 0
    self.with_commas = with_commas
    self.trailing_comma = trailing_comma
    self.chunks = []
    self._format_embedded = format_embedded
    self.include_annotations = include_annotations

append(v)

Extend self.chunks with at least one chunk, together making up the text representation of v.

Source code in preserves/text.py
541
542
543
544
545
546
547
548
549
550
def append(self, v):
    """Extend `self.chunks` with at least one chunk, together making up the text
    representation of `v`."""
    if self.chunks and self.nesting == 0:
        self.write_indent_space()
    try:
        self.nesting += 1
        self._append(v)
    finally:
        self.nesting -= 1

contents()

Returns a str constructed from the join of the chunks in self.chunks.

Source code in preserves/text.py
492
493
494
def contents(self):
    """Returns a `str` constructed from the join of the chunks in `self.chunks`."""
    return u''.join(self.chunks)

is_indenting()

Returns True iff this Formatter is in pretty-printing indenting mode.

Source code in preserves/text.py
496
497
498
499
def is_indenting(self):
    """Returns `True` iff this [Formatter][preserves.text.Formatter] is in pretty-printing
    indenting mode."""
    return self.indent_delta > 0

ImmutableDict(*args, **kwargs)

Bases: dict

A subclass of Python's built-in dict that overrides methods that could mutate the dictionary, causing them to raise TypeError('Immutable') if called.

Implements the __hash__ method, allowing ImmutableDict instances to be used whereever immutable data are permitted; in particular, as keys in other dictionaries.

>>> d = ImmutableDict([('a', 1), ('b', 2)])
>>> d
{'a': 1, 'b': 2}
>>> d['c'] = 3
Traceback (most recent call last):
  ...
TypeError: Immutable
>>> del d['b']
Traceback (most recent call last):
  ...
TypeError: Immutable
Source code in preserves/values.py
340
341
342
343
def __init__(self, *args, **kwargs):
    if hasattr(self, '__hash'): raise TypeError('Immutable')
    super(ImmutableDict, self).__init__(*args, **kwargs)
    self.__hash = None

from_kvs(kvs) staticmethod

Constructs an ImmutableDict from a sequence of alternating keys and values; compare to the ImmutableDict constructor, which takes a sequence of key-value pairs.

>>> ImmutableDict.from_kvs(['a', 1, 'b', 2])
{'a': 1, 'b': 2}
>>> ImmutableDict.from_kvs(['a', 1, 'b', 2])['c'] = 3
Traceback (most recent call last):
  ...
TypeError: Immutable
Source code in preserves/values.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
@staticmethod
def from_kvs(kvs):
    """Constructs an [ImmutableDict][preserves.values.ImmutableDict] from a sequence of
    alternating keys and values; compare to the
    [ImmutableDict][preserves.values.ImmutableDict] constructor, which takes a sequence of
    key-value pairs.

    ```python
    >>> ImmutableDict.from_kvs(['a', 1, 'b', 2])
    {'a': 1, 'b': 2}
    >>> ImmutableDict.from_kvs(['a', 1, 'b', 2])['c'] = 3
    Traceback (most recent call last):
      ...
    TypeError: Immutable

    ```

    """

    i = iter(kvs)
    result = ImmutableDict()
    result_proxy = super(ImmutableDict, result)
    try:
        while True:
            k = next(i)
            try:
                v = next(i)
            except StopIteration:
                raise DecodeError("Missing dictionary value")
            if k in result:
                raise DecodeError("Duplicate key: " + repr(k))
            result_proxy.__setitem__(k, v)
    except StopIteration:
        pass
    return result

Namespace(prefix)

A Namespace is a dictionary-like object representing a schema module that knows its location in a schema module hierarchy and whose attributes correspond to definitions and submodules within the schema module.

Attributes:

Name Type Description
_prefix tuple[Symbol]

path to this module/Namespace from the root Namespace

Source code in preserves/schema.py
874
875
def __init__(self, prefix):
    self._prefix = prefix

Parser(input_buffer='', include_annotations=False, parse_embedded=lambda x: x)

Bases: TextCodec

Parser for the human-readable Preserves text syntax.

Parameters:

Name Type Description Default
input_buffer str

initial contents of the input buffer; may subsequently be extended by calling extend.

''
include_annotations bool

if True, wrap each value and subvalue in an Annotated object.

False
parse_embedded

function accepting a Value and returning a possibly-decoded form of that value suitable for placing into an Embedded object.

lambda x: x

Normal usage is to supply input text, and keep calling next until a ShortPacket exception is raised:

>>> d = Parser('123 "hello" @x []')
>>> d.next()
123
>>> d.next()
'hello'
>>> d.next()
()
>>> d.next()
Traceback (most recent call last):
  ...
preserves.error.ShortPacket: Short input buffer

Alternatively, keep calling try_next until it yields None, which is not in the domain of Preserves Values:

>>> d = Parser('123 "hello" @x []')
>>> d.try_next()
123
>>> d.try_next()
'hello'
>>> d.try_next()
()
>>> d.try_next()

For convenience, Parser implements the iterator interface, backing it with try_next, so you can simply iterate over all complete values in an input:

>>> d = Parser('123 "hello" @x []')
>>> list(d)
[123, 'hello', ()]
>>> for v in Parser('123 "hello" @x []'):
...     print(repr(v))
123
'hello'
()

Supply include_annotations=True to read annotations alongside the annotated values:

>>> d = Parser('123 "hello" @x []', include_annotations=True)
>>> list(d)
[123, 'hello', @#x ()]

If you are incrementally reading from, say, a socket, you can use extend to add new input as if comes available:

>>> d = Parser('123 "he')
>>> d.try_next()
123
>>> d.try_next() # returns None because the input is incomplete
>>> d.extend('llo"')
>>> d.try_next()
'hello'
>>> d.try_next()

Attributes:

Name Type Description
input_buffer str

buffered input waiting to be processed

index int

read position within input_buffer

Source code in preserves/text.py
132
133
134
135
136
137
def __init__(self, input_buffer=u'', include_annotations=False, parse_embedded=lambda x: x):
    super(Parser, self).__init__()
    self.input_buffer = input_buffer
    self.index = 0
    self.include_annotations = include_annotations
    self.parse_embedded = parse_embedded

extend(text)

Appends text to the remaining contents of self.input_buffer, trimming already-processed text from the front of self.input_buffer and resetting self.index to zero.

Source code in preserves/text.py
139
140
141
142
143
def extend(self, text):
    """Appends `text` to the remaining contents of `self.input_buffer`, trimming already-processed
    text from the front of `self.input_buffer` and resetting `self.index` to zero."""
    self.input_buffer = self.input_buffer[self.index:] + text
    self.index = 0

next()

Reads the next complete Value from the internal buffer, raising ShortPacket if too few bytes are available, or DecodeError if the input is invalid somehow.

Source code in preserves/text.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def next(self):
    """Reads the next complete `Value` from the internal buffer, raising
    [ShortPacket][preserves.error.ShortPacket] if too few bytes are available, or
    [DecodeError][preserves.error.DecodeError] if the input is invalid somehow.

    """
    self.skip_whitespace()
    c = self.peek()
    if c == '"':
        self.skip()
        return self.wrap(self.read_string('"'))
    if c == "'":
        self.skip()
        return self.wrap(Symbol(self.read_string("'")))
    if c == '@':
        self.skip()
        return self.unshift_annotation(self.next(), self.next())
    if c == ';':
        raise DecodeError('Semicolon is reserved syntax')
    if c == ':':
        raise DecodeError('Unexpected key/value separator between items')
    if c == '#':
        self.skip()
        c = self.nextchar()
        if c in ' \t': return self.unshift_annotation(self.comment_line(), self.next())
        if c in '\n\r': return self.unshift_annotation('', self.next())
        if c == '!':
            return self.unshift_annotation(
                Record(Symbol('interpreter'), [self.comment_line()]),
                self.next())
        if c == 'f': self.require_delimiter('#f'); return self.wrap(False)
        if c == 't': self.require_delimiter('#t'); return self.wrap(True)
        if c == '{': return self.wrap(self.read_set())
        if c == '"': return self.wrap(self.read_literal_binary())
        if c == 'x':
            c = self.nextchar()
            if c == '"': return self.wrap(self.read_hex_binary())
            if c == 'd': return self.wrap(self.read_hex_float())
            raise DecodeError('Invalid #x syntax')
        if c == '[': return self.wrap(self.read_base64_binary())
        if c == ':':
            if self.parse_embedded is None:
                raise DecodeError('No parse_embedded function supplied')
            return self.wrap(Embedded(self.parse_embedded(self.next())))
        raise DecodeError('Invalid # syntax')
    if c == '<':
        self.skip()
        vs = self.upto('>', False)
        if len(vs) == 0:
            raise DecodeError('Missing record label')
        return self.wrap(Record(vs[0], vs[1:]))
    if c == '[':
        self.skip()
        return self.wrap(self.upto(']', True))
    if c == '{':
        self.skip()
        return self.wrap(self.read_dictionary())
    if c in '>]},':
        raise DecodeError('Unexpected ' + c)
    self.skip()
    return self.wrap(self.read_raw_symbol_or_number([c]))

try_next()

Like next, but returns None instead of raising ShortPacket.

Source code in preserves/text.py
385
386
387
388
389
390
391
392
393
def try_next(self):
    """Like [next][preserves.text.Parser.next], but returns `None` instead of raising
    [ShortPacket][preserves.error.ShortPacket]."""
    start = self.index
    try:
        return self.next()
    except ShortPacket:
        self.index = start
        return None

Record(key, fields)

Bases: object

Representation of Preserves Records, which are a pair of a label Value and a sequence of field Values.

>>> r = Record(Symbol('label'), ['field1', ['field2item1', 'field2item2']])
>>> r
#label('field1', ['field2item1', 'field2item2'])
>>> r.key
#label
>>> r.fields
('field1', ['field2item1', 'field2item2'])
>>> import preserves
>>> preserves.stringify(r)
'<label "field1" ["field2item1" "field2item2"]>'
>>> r == preserves.parse('<label "field1" ["field2item1" "field2item2"]>')
True

Parameters:

Name Type Description Default
key Value

the Record's label

required
fields iterable[Value]

the fields of the Record

required

Attributes:

Name Type Description
key Value

the Record's label

fields tuple[Value]

the fields of the Record

Source code in preserves/values.py
152
153
154
155
def __init__(self, key, fields):
    self.key = key
    self.fields = tuple(fields)
    self.__hash = None

makeBasicConstructor(label, fieldNames) staticmethod

Constructs and returns a "constructor" for Records having a certain label and number of fields.

Deprecated

Use preserves.schema definitions instead.

The "constructor" is a callable function that accepts len(fields) arguments and returns a Record with label as its label and the arguments to the constructor as field values.

In addition, the "constructor" has a constructorInfo attribute holding a RecordConstructorInfo object, an isClassOf attribute holding a unary function that returns True iff its argument is a Record with label label and arity len(fieldNames), and an ensureClassOf attribute that raises an Exception if isClassOf returns false on its argument and returns the argument otherwise.

Finally, for each field name f in fieldNames, the "constructor" object has an attribute _f that is a unary function that retrieves the f field from the passed in argument.

>>> c = Record.makeBasicConstructor(Symbol('date'), 'year month day')
>>> c(1969, 7, 16)
#date(1969, 7, 16)
>>> c.constructorInfo
#date/3
>>> c.isClassOf(c(1969, 7, 16))
True
>>> c.isClassOf(Record(Symbol('date'), [1969, 7, 16]))
True
>>> c.isClassOf(Record(Symbol('date'), [1969]))
False
>>> c.ensureClassOf(c(1969, 7, 16))
#date(1969, 7, 16)
>>> c.ensureClassOf(Record(Symbol('date'), [1969]))
Traceback (most recent call last):
  ...
TypeError: Record: expected #date/3, got #date(1969)
>>> c._year(c(1969, 7, 16))
1969
>>> c._month(c(1969, 7, 16))
7
>>> c._day(c(1969, 7, 16))
16

Parameters:

Name Type Description Default
label Value

Label to use for constructed/matched Records

required
fieldNames tuple[str] | list[str] | str

Names of the Record's fields

required
Source code in preserves/values.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@staticmethod
def makeBasicConstructor(label, fieldNames):
    """Constructs and returns a "constructor" for `Record`s having a certain `label` and
    number of fields.

    Deprecated:
       Use [preserves.schema][] definitions instead.

    The "constructor" is a callable function that accepts `len(fields)` arguments and
    returns a [Record][preserves.values.Record] with `label` as its label and the arguments
    to the constructor as field values.

    In addition, the "constructor" has a `constructorInfo` attribute holding a
    [RecordConstructorInfo][preserves.values.RecordConstructorInfo] object, an `isClassOf`
    attribute holding a unary function that returns `True` iff its argument is a
    [Record][preserves.values.Record] with label `label` and arity `len(fieldNames)`, and
    an `ensureClassOf` attribute that raises an `Exception` if `isClassOf` returns false on
    its argument and returns the argument otherwise.

    Finally, for each field name `f` in `fieldNames`, the "constructor" object has an
    attribute `_f` that is a unary function that retrieves the `f` field from the passed in
    argument.

    ```python
    >>> c = Record.makeBasicConstructor(Symbol('date'), 'year month day')
    >>> c(1969, 7, 16)
    #date(1969, 7, 16)
    >>> c.constructorInfo
    #date/3
    >>> c.isClassOf(c(1969, 7, 16))
    True
    >>> c.isClassOf(Record(Symbol('date'), [1969, 7, 16]))
    True
    >>> c.isClassOf(Record(Symbol('date'), [1969]))
    False
    >>> c.ensureClassOf(c(1969, 7, 16))
    #date(1969, 7, 16)
    >>> c.ensureClassOf(Record(Symbol('date'), [1969]))
    Traceback (most recent call last):
      ...
    TypeError: Record: expected #date/3, got #date(1969)
    >>> c._year(c(1969, 7, 16))
    1969
    >>> c._month(c(1969, 7, 16))
    7
    >>> c._day(c(1969, 7, 16))
    16

    ```

    Args:
        label (Value): Label to use for constructed/matched `Record`s
        fieldNames (tuple[str] | list[str] | str): Names of the `Record`'s fields

    """
    if type(fieldNames) == str:
        fieldNames = fieldNames.split()
    arity = len(fieldNames)
    def ctor(*fields):
        if len(fields) != arity:
            raise Exception("Record: cannot instantiate %r expecting %d fields with %d fields"%(
                label,
                arity,
                len(fields)))
        return Record(label, fields)
    ctor.constructorInfo = RecordConstructorInfo(label, arity)
    ctor.isClassOf = lambda v: \
                     isinstance(v, Record) and v.key == label and len(v.fields) == arity
    def ensureClassOf(v):
        if not ctor.isClassOf(v):
            raise TypeError("Record: expected %r/%d, got %r" % (label, arity, v))
        return v
    ctor.ensureClassOf = ensureClassOf
    for fieldIndex in range(len(fieldNames)):
        fieldName = fieldNames[fieldIndex]
        # Stupid python scoping bites again
        def getter(fieldIndex):
            return lambda v: ensureClassOf(v)[fieldIndex]
        setattr(ctor, '_' + fieldName, getter(fieldIndex))
    return ctor

makeConstructor(labelSymbolText, fieldNames) staticmethod

Equivalent to Record.makeBasicConstructor(Symbol(labelSymbolText), fieldNames).

Deprecated

Use preserves.schema definitions instead.

Source code in preserves/values.py
190
191
192
193
194
195
196
197
198
@staticmethod
def makeConstructor(labelSymbolText, fieldNames):
    """
    Equivalent to `Record.makeBasicConstructor(Symbol(labelSymbolText), fieldNames)`.

    Deprecated:
       Use [preserves.schema][] definitions instead.
    """
    return Record.makeBasicConstructor(Symbol(labelSymbolText), fieldNames)

SchemaDecodeFailed(cls, p, v, failures=None)

Bases: ValueError

Raised when decode cannot find a way to parse a given input.

Attributes:

Name Type Description
cls class

the SchemaObject subclass attempting the parse

pattern Value

the failing pattern, a Value conforming to schema meta.Pattern

value Value

the unparseable value

failures list[SchemaDecodeFailed]

descriptions of failed paths attempted during the match this failure describes

Source code in preserves/schema.py
310
311
312
313
314
315
def __init__(self, cls, p, v, failures=None):
    super().__init__()
    self.cls = cls
    self.pattern = p
    self.value = v
    self.failures = [] if failures is None else failures

SchemaObject

Base class for classes representing grammatical productions in a schema: instances of SchemaObject represent schema definitions. This is an abstract class, as are its subclasses Enumeration and Definition. It is subclasses of those subclasses, automatically produced during schema loading, that are actually instantiated.

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')

>>> bundle.stream.Mode.mro()[1:-1]
[<class 'preserves.schema.Enumeration'>, <class 'preserves.schema.SchemaObject'>]

>>> bundle.stream.Mode.packet.mro()[1:-1]
[<class 'stream.Mode._ALL'>, <class 'preserves.schema.Definition'>, <class 'preserves.schema.SchemaObject'>]

>>> bundle.stream.StreamListenerError.mro()[1:-1]
[<class 'preserves.schema.Definition'>, <class 'preserves.schema.SchemaObject'>]

Illustrating the class attributes on SchemaObject subclasses:

>>> bundle.stream.Mode.ROOTNS is bundle
True

>>> print(stringify(bundle.stream.Mode.SCHEMA, indent=2))
<or [
  [
    "bytes"
    <lit bytes>
  ]
  [
    "lines"
    <ref [] LineMode>
  ]
  [
    "packet"
    <rec <lit packet> <tuple [<named size <atom SignedInteger>>]>>
  ]
  [
    "object"
    <rec <lit object> <tuple [<named description any>]>>
  ]
]>

>>> bundle.stream.Mode.MODULE_PATH
(#stream,)

>>> bundle.stream.Mode.NAME
#Mode

>>> bundle.stream.Mode.VARIANT is None
True
>>> bundle.stream.Mode.packet.VARIANT
#packet

MODULE_PATH = None class-attribute instance-attribute

A sequence (tuple) of Symbols naming the path from the root to the schema module containing this definition.

NAME = None class-attribute instance-attribute

A Symbol naming this definition within its module.

ROOTNS = None class-attribute instance-attribute

A Namespace that is the top-level environment for all bundles included in the Compiler run that produced this SchemaObject.

SCHEMA = None class-attribute instance-attribute

A Value conforming to schema meta.Definition (and thus often to meta.Pattern etc.), interpreted by the SchemaObject machinery to drive parsing, unparsing and so forth.

VARIANT = None class-attribute instance-attribute

None for Definitions (such as bundle.stream.StreamListenerError above) and for overall Enumerations (such as bundle.stream.Mode), or a Symbol for variant definitions contained within an enumeration (such as bundle.stream.Mode.packet).

__preserve__()

Called by preserves.values.preserve: unparses the information represented by this instance, using its schema definition, to produce a Preserves Value.

Source code in preserves/schema.py
536
537
538
539
def __preserve__(self):
    """Called by [preserves.values.preserve][]: *unparses* the information represented by
    this instance, using its schema definition, to produce a Preserves `Value`."""
    raise NotImplementedError('Subclass responsibility')

decode(v) classmethod

Parses v using the SCHEMA, returning a (sub)instance of SchemaObject or raising SchemaDecodeFailed.

Source code in preserves/schema.py
443
444
445
446
447
448
@classmethod
def decode(cls, v):
    """Parses `v` using the [SCHEMA][preserves.schema.SchemaObject.SCHEMA], returning a
    (sub)instance of [SchemaObject][preserves.schema.SchemaObject] or raising
    [SchemaDecodeFailed][preserves.schema.SchemaDecodeFailed]."""
    raise NotImplementedError('Subclass responsibility')

try_decode(v) classmethod

Parses v using the SCHEMA, returning a (sub)instance of SchemaObject or None if parsing failed.

Source code in preserves/schema.py
450
451
452
453
454
455
456
457
458
@classmethod
def try_decode(cls, v):
    """Parses `v` using the [SCHEMA][preserves.schema.SchemaObject.SCHEMA], returning a
    (sub)instance of [SchemaObject][preserves.schema.SchemaObject] or `None` if parsing
    failed."""
    try:
        return cls.decode(v)
    except SchemaDecodeFailed:
        return None

ShortPacket

Bases: DecodeError

Raised whenever preserves.binary.Decoder or preserves.text.Parser discover that they want to read beyond the end of the currently-available input buffer in order to completely read an encoded value.

Symbol(name)

Bases: object

Representation of Preserves Symbols.

>>> Symbol('xyz')
#xyz
>>> Symbol('xyz').name
'xyz'
>>> repr(Symbol('xyz'))
'#xyz'
>>> str(Symbol('xyz'))
'xyz'
>>> import preserves
>>> preserves.stringify(Symbol('xyz'))
'xyz'
>>> preserves.stringify(Symbol('hello world'))
"'hello world'"
>>> preserves.parse('xyz')
#xyz
>>> preserves.parse("'hello world'")
#hello world

Attributes:

Name Type Description
name str | Symbol

The symbol's text label. If an existing Symbol is passed in, the existing Symbol's name is used as the name for the new Symbol.

Source code in preserves/values.py
78
79
def __init__(self, name):
    self.name = name.name if isinstance(name, Symbol) else name

annotate(v, *anns)

Wraps v in an Annotated object, if it isn't already wrapped, and appends each of the anns to the Annotated's annotations sequence. NOTE: Does not recursively ensure that any parts of the argument v are themselves wrapped in Annotated objects!

>>> import preserves
>>> print(preserves.stringify(annotate(123, "A comment", "Another comment")))
@"A comment" @"Another comment" 123
Source code in preserves/values.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def annotate(v, *anns):
    """Wraps `v` in an [Annotated][preserves.values.Annotated] object, if it isn't already
    wrapped, and appends each of the `anns` to the [Annotated][preserves.values.Annotated]'s
    `annotations` sequence. NOTE: Does not recursively ensure that any parts of the argument
    `v` are themselves wrapped in [Annotated][preserves.values.Annotated] objects!

    ```python
    >>> import preserves
    >>> print(preserves.stringify(annotate(123, "A comment", "Another comment")))
    @"A comment" @"Another comment" 123

    ```
    """
    if not is_annotated(v):
        v = Annotated(v)
    for a in anns:
        v.annotations.append(a)
    return v

canonicalize(v, **kwargs)

As encode, but sets canonicalize=True in the Encoder constructor.

Source code in preserves/binary.py
436
437
438
439
440
441
def canonicalize(v, **kwargs):
    """As [encode][preserves.binary.encode], but sets `canonicalize=True` in the
    [Encoder][preserves.binary.Encoder] constructor.

    """
    return encode(v, canonicalize=True, **kwargs)

cmp(a, b)

Returns -1 if a < b, or 0 if a = b, or 1 if a > b according to the Preserves total order.

Source code in preserves/compare.py
82
83
84
85
def cmp(a, b):
    """Returns `-1` if `a` < `b`, or `0` if `a` = `b`, or `1` if `a` > `b` according to the
    [Preserves total order](https://preserves.dev/preserves.html#total-order)."""
    return _cmp(preserve(a), preserve(b))

decode(bs, **kwargs)

Yields the first complete encoded value from bs, passing kwargs through to the Decoder constructor. Raises exceptions as per next.

Parameters:

Name Type Description Default
bs bytes

encoded data to decode

required
Source code in preserves/binary.py
247
248
249
250
251
252
253
254
255
256
def decode(bs, **kwargs):
    """Yields the first complete encoded value from `bs`, passing `kwargs` through to the
    [Decoder][preserves.binary.Decoder] constructor. Raises exceptions as per
    [next][preserves.binary.Decoder.next].

    Args:
        bs (bytes): encoded data to decode

    """
    return Decoder(packet=bs, **kwargs).next()

decode_with_annotations(bs, **kwargs)

Like decode, but supplying include_annotations=True to the Decoder constructor.

Source code in preserves/binary.py
258
259
260
261
def decode_with_annotations(bs, **kwargs):
    """Like [decode][preserves.binary.decode], but supplying `include_annotations=True` to the
    [Decoder][preserves.binary.Decoder] constructor."""
    return Decoder(packet=bs, include_annotations=True, **kwargs).next()

extend(cls)

A decorator for function definitions. Useful for adding behaviour to the classes resulting from loading a schema module:

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')

>>> @extend(bundle.stream.LineMode.lf)
... def what_am_i(self):
...     return 'I am a LINEFEED linemode'

>>> @extend(bundle.stream.LineMode.crlf)
... def what_am_i(self):
...     return 'I am a CARRIAGE-RETURN-PLUS-LINEFEED linemode'

>>> bundle.stream.LineMode.lf()
LineMode.lf()
>>> bundle.stream.LineMode.lf().what_am_i()
'I am a LINEFEED linemode'

>>> bundle.stream.LineMode.crlf()
LineMode.crlf()
>>> bundle.stream.LineMode.crlf().what_am_i()
'I am a CARRIAGE-RETURN-PLUS-LINEFEED linemode'
Source code in preserves/schema.py
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
def extend(cls):
    """A decorator for function definitions. Useful for adding *behaviour* to the classes
    resulting from loading a schema module:

    ```python
    >>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')

    >>> @extend(bundle.stream.LineMode.lf)
    ... def what_am_i(self):
    ...     return 'I am a LINEFEED linemode'

    >>> @extend(bundle.stream.LineMode.crlf)
    ... def what_am_i(self):
    ...     return 'I am a CARRIAGE-RETURN-PLUS-LINEFEED linemode'

    >>> bundle.stream.LineMode.lf()
    LineMode.lf()
    >>> bundle.stream.LineMode.lf().what_am_i()
    'I am a LINEFEED linemode'

    >>> bundle.stream.LineMode.crlf()
    LineMode.crlf()
    >>> bundle.stream.LineMode.crlf().what_am_i()
    'I am a CARRIAGE-RETURN-PLUS-LINEFEED linemode'

    ```

    """
    @wraps(cls)
    def extender(f):
        setattr(cls, f.__name__, f)
        return f
    return extender

is_annotated(v)

True iff v is an instance of Annotated.

Source code in preserves/values.py
495
496
497
def is_annotated(v):
    """`True` iff `v` is an instance of [Annotated][preserves.values.Annotated]."""
    return isinstance(v, Annotated)

load_schema_file(filename)

Simple entry point to the compiler: creates a Compiler, calls load on it, and returns its root Namespace.

>>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')
>>> type(bundle)
<class 'preserves.schema.Namespace'>
Source code in preserves/schema.py
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
def load_schema_file(filename):
    """Simple entry point to the compiler: creates a [Compiler][preserves.schema.Compiler],
    calls [load][preserves.schema.Compiler.load] on it, and returns its `root`
    [Namespace][preserves.schema.Namespace].

    ```python
    >>> bundle = load_schema_file('docs/syndicate-protocols-schema-bundle.bin')
    >>> type(bundle)
    <class 'preserves.schema.Namespace'>

    ```
    """
    c = Compiler()
    c.load(filename)
    return c.root

parse(text, **kwargs)

Yields the first complete encoded value from text, passing kwargs through to the Parser constructor. Raises exceptions as per next.

Parameters:

Name Type Description Default
text str

encoded data to decode

required
Source code in preserves/text.py
404
405
406
407
408
409
410
411
412
413
def parse(text, **kwargs):
    """Yields the first complete encoded value from `text`, passing `kwargs` through to the
    [Parser][preserves.text.Parser] constructor. Raises exceptions as per
    [next][preserves.text.Parser.next].

    Args:
        text (str): encoded data to decode

    """
    return Parser(input_buffer=text, **kwargs).next()

parse_with_annotations(bs, **kwargs)

Like parse, but supplying include_annotations=True to the Parser constructor.

Source code in preserves/text.py
415
416
417
418
def parse_with_annotations(bs, **kwargs):
    """Like [parse][preserves.text.parse], but supplying `include_annotations=True` to the
    [Parser][preserves.text.Parser] constructor."""
    return Parser(input_buffer=bs, include_annotations=True, **kwargs).next()

preserve(v)

Converts v to a representation of a Preserves Value by (repeatedly) setting

v = v.__preserve__()

while v has a __preserve__ method. Parsed Schema values are able to render themselves to their serialized representations this way.

Source code in preserves/values.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def preserve(v):
    """Converts `v` to a representation of a Preserves `Value` by (repeatedly) setting

    ```python
    v = v.__preserve__()
    ```

    while `v` has a `__preserve__` method. Parsed [Schema][preserves.schema]
    values are able to render themselves to their serialized representations this way.

    """
    while hasattr(v, '__preserve__'):
        v = v.__preserve__()
    return v

safeattrname(k)

Escapes Python keywords by prepending _; passes all other strings through.

Source code in preserves/schema.py
611
612
613
def safeattrname(k):
    """Escapes Python keywords by prepending `_`; passes all other strings through."""
    return k + '_' if keyword.iskeyword(k) else k

stringify(v, **kwargs)

Convert a single Value v to a string. Any supplied kwargs are passed on to the underlying Formatter constructor.

Source code in preserves/text.py
602
603
604
605
606
607
def stringify(v, **kwargs):
    """Convert a single `Value` `v` to a string. Any supplied `kwargs` are passed on to the
    underlying [Formatter][preserves.text.Formatter] constructor."""
    e = Formatter(**kwargs)
    e.append(v)
    return e.contents()

strip_annotations(v, depth=inf)

Exposes depth layers of raw structure of potentially-Annotated Values. If depth==0 or v is not Annotated, just returns v. Otherwise, descends recursively into the structure of v.item.

>>> import preserves
>>> a = preserves.parse('@"A comment" [@a 1 @b 2 @c 3]', include_annotations=True)
>>> is_annotated(a)
True
>>> print(preserves.stringify(a))
@"A comment" [@a 1 @b 2 @c 3]
>>> print(preserves.stringify(strip_annotations(a)))
[1 2 3]
>>> print(preserves.stringify(strip_annotations(a, depth=1)))
[@a 1 @b 2 @c 3]
Source code in preserves/values.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
def strip_annotations(v, depth=inf):
    """Exposes `depth` layers of raw structure of
    potentially-[Annotated][preserves.values.Annotated] `Value`s. If `depth==0` or `v` is not
    [Annotated][preserves.values.Annotated], just returns `v`. Otherwise, descends recursively
    into the structure of `v.item`.

    ```python
    >>> import preserves
    >>> a = preserves.parse('@"A comment" [@a 1 @b 2 @c 3]', include_annotations=True)
    >>> is_annotated(a)
    True
    >>> print(preserves.stringify(a))
    @"A comment" [@a 1 @b 2 @c 3]
    >>> print(preserves.stringify(strip_annotations(a)))
    [1 2 3]
    >>> print(preserves.stringify(strip_annotations(a, depth=1)))
    [@a 1 @b 2 @c 3]

    ```
    """

    if depth == 0: return v
    if not is_annotated(v): return v

    next_depth = depth - 1
    def walk(v):
        return strip_annotations(v, next_depth)

    v = v.item
    if isinstance(v, Record):
        return Record(strip_annotations(v.key, depth), tuple(walk(f) for f in v.fields))
    elif isinstance(v, list):
        return tuple(walk(f) for f in v)
    elif isinstance(v, tuple):
        return tuple(walk(f) for f in v)
    elif isinstance(v, set):
        return frozenset(walk(f) for f in v)
    elif isinstance(v, frozenset):
        return frozenset(walk(f) for f in v)
    elif isinstance(v, dict):
        return ImmutableDict.from_kvs(walk(f) for f in dict_kvs(v))
    elif is_annotated(v):
        raise ValueError('Improper annotation structure')
    else:
        return v