Authors: Daniel Shawcross Wilkerson and Simon Fredrick Vicente Goldsmith

Contributors: Ryan Barrett, Erick Armbrust, Robert Johnson, Alfred Fuller

Date: 22 July 2009, Revision: 0.28

This algorithm is still being vetted and this document is still a draft. For example, we recently learned of a phenomenon of App Engine called "submarine writes" and we haven't finished reviewing the algorithm in light of this phenomenon.

Abstract

Massively scalable web applications encounter a fundamental tension in computing between performance and correctness: performance is often addressed by using a large and therefore distributed machine where programs are multi-threaded and interruptible, whereas correctness requires data invariants to be maintained with absolute certainty. A solution to this problem is transactions [Gray-Reuter].

Some distributed systems such as Google App Engine [GAE] provide transaction semantics but only for functions that access one of a set of predefined local regions of the database: a "Local Transaction" (LT) [TransGAE]. To address this problem we give a "Distributed Transaction" (DT) algorithm which provides transaction semantics for functions that operate on any set of objects distributed across the machine. We assume an underlying layer providing "strong" (distributed-) consistent LTs.

The algorithm is "optimistic" [opt-cur] in the sense that it is optimized for low contention: it is assumed that most DTs will succeed but occasionally contention can make a DT abort.

Introduction

Correctness and performance are two of the essential concerns of software engineering.

where cost is expenditure of any resource: time, space, energy, money, people, machines.

Correctness requires invariants

Reasoning about correctness is best done by establishing and maintaining

Typical invariants maintain the integrity of

Scalable performance requires distributed-ness

Scalable performance requires distributed computing: the machine as a single-point abstraction is a myth that can no longer be maintained. Distributed computers exhibit some properties that make programming challenging:

Having both requires transactions

Maintaining correctness on a distributed machine requires

We need these transactions to have the following properties.

We assume Local Transactions

Some databases, such that of Google App Engine, provide the ability to

We call these "Local Transactions" (LTs) as their transactional semantics is only available to local parts of the database.

We assume the underlying layer provides "strong" distributed-consistency

This is a paper on distributed databases. The word "consistency" already means one thing in the context of distributed algorithms and another thing in the context of databases. Above we defined a "semantic-consistency", which is the databases notion of consistency. Here we discuss "distributed-consistency" which is the distributed algorithms notion of consistency.

The Google App Engine teams calls the property we need (and that GAE provides) "strong [distributed-] consistency"; we will define it more precisely below as "synchronous local sequential distributed-consistency"; see the section "Full proof of Isolation".

We provide Distributed Transactions

In general programs need the ability to do operations across an unrestricted set of objects within a single transaction. In this article we presuppose a layer providing Local Transactions. Upon this layer we construct an algorithm providing transactions

In some sense such a layer has "distributed" its semantics across the partition and thus we call it a "Distributed Transaction" (DT) layer.

We do not address queries

When using our Distributed Transactions the set of objects operated upon must be specified directly by their keys, as one does with an object store, not by predicates on their properties, as does with a general relational query. Indices in GAE are not updated at the same time as the data therefore the set of query results can differ from those that satisfy the predicate [TransIsolGAE]. Further, queries do not honor Local Transactions. Such properties make it difficult to make queries honor Distributed Transactions and doing so is beyond the scope of this article.

The absence of queries is not such an onerous restriction: one may still implement all forms of relations, though not necessarily using traditional relational idioms. For example, to implement a one-many relation, use a list field from the one to the many rather than a who-is-pointing-at-me query for fields pointing from the many to the one [Barrett].

We present our algorithm in the context of Google App Engine

Google App Engine [GAE], backed by Big Table [BigTable], provides a method "run_in_transaction" that takes a client function and its arguments and runs that function on those arguments in a single transaction [TransGAE, TransIsolGAE]. GAE transactions have some restrictions as follows.

  1. GAE libraries provide a means by which an object can specify a "parent" object at creation time. A connected component of such parent relationships is called an "Entity Group". A GAE transaction can only operate on objects within one "Entity Group".

  2. In a relational database, all specifications of both single and sets of objects are done with a "where" clause predicate, which implies a query to map the predicate to the result set. In contrast, in the GAE/Big Table model there is a distinction between an access to an individual object versus a query. GAE Local Transactions allow the reading and writing the objects as accessed by their id; GAE does not allow queries within Local Transactions.

Our contribution is that we provide transactional semantics without restriction (1): we create a kind of transaction that works across objects not in the same Entity Group. We call these transactions "Distributed Transactions" (DTs) in order to distinguish them from the original GAE "Local (to one Entity Group) Transactions". We do not remove restriction (2): our Distributed Transactions do not work for queries.

It should be noted that the underlying Google App Engine database is "strongly [distributed-] consistent" [Barrett, DatastoreIntro]. We define this below more precisely as "synchronous local sequential distributed-consistency"; see the section "Full proof of Isolation" for more.

We use GAE-specific terminology

Our algorithm applies to any database where the data is partitioned and where data in a single part can be operated on in a single Local Transaction and where these operations are strongly distributed-consistent; however for the purposes of specificity, we will uniformly adopt the terminology of the Python version of Google App Engine. To those who are experienced in this field we think such specificity will not be an undue burden. To those who are new the GAE terminology will give them a specific system with which to play while learning; we think this best as all algorithms are best learned first in the specific.

Please note that GAE uses the terms "get" and "put"; since we provide code fragments we use these terms as well when referring to actual GAE puts and gets. However the more established algorithmic terminology for database interactions is "read" and "write" so we revert to those when not speaking of a literal GAE put and get.

Desired properties

As suggested in "Transaction Processing" by Jim Gray and Andreas Reuter [Gray-Reuter], we want to know that our Distributed Transactions provide the traditional ACID properties: Atomicity, (Semantic-) Consistency, Isolation, and Durability. However many more properties than those are needed for a practical framework, as follows below.

In order to state these properties we need names for the various roles different parts play in the system:

Uniquely-specified objects

These may seem very simple, but I included them because I have violated them before!

ACID

The ACID properties of a transaction are more correctly considered in the following order which starts with the uniquely-specified objects distinguished above and builds successive layers of abstraction.

Other correctness properties

Performance properties

Overview of solution

The app would like to run a client function in a Distributed Transaction. The app calls the distributed transaction infrastructure requesting that the client function be run:

distributed_run_in_transaction(client_func, *args, **kwargs).

The client function is synchronously run in a Distributed Transaction as follows.

  1. A Distributed Transaction (DT) object is created to track the state of the process.

  2. At the end of the process the DT object will also hold the result: whether it succeeded or failed and either the return value or the exception that caused the failure (see section "Completion state is visible even in the presence of asynchrony").

  3. If the thread does not time out then the function will return the DT object synchronously but if it does not then another thread such as a thread started to serve a subsequent user request will have to query for DT objects.

Note that this is the basic Optimistic concurrency [opt-cur] strategy: let the client function do whatever it does without locks and only later get locks and attempt to commit the operation to the database.

Note that the client function should not have any interaction with the outside world other than reading and writing from/to the database; any other such sources and effects cannot be managed by the Distributed Transaction infrastructure.

Essential strategy: Separating semantic time and actual time

The essence of the transactional nature of this algorithm is that the integrity of the semantic time implied by the dataflow of the client code is preserved even though actual time is being rearranged for performance reasons:

Another way to look at it is that the only effect of the client is to map reads plus other input to writes. Atomicity and Isolation are satisfied if, from the point of view of the database, it looks as though there is one instant in time where the client did its entire computation: reading, computing, and writing all at once.

Overview of the Distributed Transactions algorithm

The algorithm has two major phases: (1) Run & Record and (2) Commit.

Run & Record

Run the client program intercepting its database reads and writes.

When the client is done, flush the cache of any written (or created or deleted) objects by writing their state to "shadow" objects. Write each shadow object into the same entity group as its respective original object. We also record the objects read in get_list_obj and their versions in get_list_version and the objects written in put_list_obj and their shadows in put_list_shadow.

Commit

Make permanent in the database the map from reads to writes computed by the client.

  1. Lock written objects in order:

    1. Sort the written objects by their keys and group by entity group.

    2. In sorted order for each written object, get a write lock on it.

      • If the object is already locked, suspend this DT and roll forward the DT holding the lock.

  2. Check read objects: for each read object abort unless

    1. not-locked-by-other: no other DT holds the write lock on the object, and

    2. has-recorded-version: the database object still has the recorded version number.

  3. Complete writes by entity group: for each written object and in one LT per entity group,

    1. copy the state of the shadow object to that of the real object,

    2. delete the shadow object, and

    3. delete the write lock.

Note that for (client-) named DT-flavored objects, we maintain a pure meta-data object whenever the object is not in existence for semantic continuity of meta-data for the name.

Abort conditions

Overview of analysis

Atomicity: an abort is not possible after the write locks have been obtained and the read objects checked.

Isolation: There is one moment after all the write locks are obtained where the DT "has all locks": mode 2locked. This moment must serialize with that of any other DT that interacts with the same objects where at least one interaction is a write.

Thread safety: Each step of the algorithm proceeds with atomic steps along a monotonic path in the state space taking only correct transitions; thus despite multiple threads the state of the entire distributed transaction will stay within the correct state transition graph.

No deadlock: Writers in stage 3checked only wait on write locks of other writers in 3checked. Locks are obtained consistent with a global order. Readers only wait for writers when in stage 0init and holding no locks. Therefore no wait-for cycles can arise.

No starvation: If a thread times out once in 1ready it will be rolled forward by other threads: blocked DTs roll forward blocking DTs, the app should roll forward old DTs started by the same user before starting new ones, and a background thread rolls-forward any old DTs it finds.

Schema

We require the following new data structures or new fields on existing data structures.

Distributed Transaction objects

A Distributed Transaction, DT, object has the following members:

Optimization: We could get rid of the "created" field and just use the "modified" field in its place with some modification of the semantics.

In order to ensure thread safety we constrain mode transitions to be allowed within the correct mode transition directed acyclic graph (DAG) which is the union of the following graphs:

User object meta-data

Each object in the database is required to have the following additional properties for use by the DTs system:

Note that all user objects also have a "flavor": either LT or DT. However (client controlled) LTs know nothing of this DT meta-data; that is, no object will have "flavor = LT". Any means to infer that an object is participating in the DT infrastructure is enough to know that it has flavor "DT: there is no point in spending space actually recording such a field. See section "Interaction of LTs and DTs".

Shadow objects

We also have a class shadow that in Python would inherit from Expando. It stores shadow copies of user objects that the client has put but have not actually been written to the database yet. Each shadow object is put in the same entity group as the user object the state of which it stores. Further we need a special ShadowDelete class which we use when the object is to be deleted rather than having its state updated. In addition to having shadow copies of fields of the object in question to be written, it has the following properties.

Optimization: We could get rid of the created field if GAE queries become reliable; see section "Handling hard timeouts in a background thread".

Note: Due to an artifact of the underlying Big Table implementation, we might need to also save copies of meta-data in the shadow objects. For example we may need to do something special for objects that have an empty list member.

Optimistic user object cache

We keep a cache of user objects that have been read or written. If an object is operated on more than once, we use the cached copy instead. We optimistically ignoring the possibility of the object having changed in the database; note that any cache coherency failures will be caught later.

Specifically, the cache consists of three parts as follows.

When the client is done all of the objects flagged as "put" and "delete" in the write operation cache are written to the database as shadows before transitioning to mode 1ready.

Hiding the DT infrastructure from the app

In order to prevent an app author from picking a name that collides with that of the DT infrastructure, all names in the shared namespace should be prefixed with "__DT" and the app author informed of this restriction. Note that this does not violate the rules for names of keys as those rules prohibit a name to both start and end with a double underscore [Keys]. The names that should be protected this way are:

Algorithm detail: Run & Record

Create a Distributed Transaction object

  1. Make a DT object dt.

  2. Set dt.mode to 0init.

  3. Put it into the database; note that this will automatically set dt.created.

Note that there is no need for a GAE Local Transaction as we are creating the DT object.

Now the client runs, using the API on dt to read and write objects instead of using the standard database API.

Register objects for reading when requested

The client gets an object X from the database using its key by calling dt.get(key) (instead of the using the usual db.get()). This operates as follows:

  1. Return the cached instance of the object if present.

  2. Get the object from the database by key in the standard way.

  3. If it does not exist and if X_key is a name key (instead of a numeric id [Keys, KeyClass]; see section "Namespace management") get the corresponding pure meta-data object and use it instead; see section "Persistent meta-data for names".

  4. If the object is not DT-flavored, abort the DT.

  5. If the read object is locked, suspend this DT and roll forward the DT holding the lock and then retry.

  6. Cache a copy of the object; if there was no object, record "None". Also cache the version number of the initial read.

  7. Return X to the client; return None if we found no named object but a pure meta-data object instead.

If we read an object that already has a write lock and therefore suspend the current DT to roll forward the other, the current thread could time out. Since the DT is still in stage 0init this will abort the DT: there will be no way to roll it forward later. However there is no better alternative: the locking DT is in stage 3checked and cannot be aborted even if we thought it would be prudent to do so and if the reader proceeds it will simply abort later during "Check read objects" (unless the writer aborts).

On a second read of the same object the cache could hide the presence of a write lock or a new version number on the object. In the case of a write lock we miss the opportunity to roll forward the other DT, possibly resulting in it aborting and allowing this DT to proceed. However, since the basic assumption of this algorithm is optimistic, that most transactions succeed, we assume this is an unlikely win. Mostly if we read an object a second time and it is write locked or has a new version number we are out of luck and are going to abort. Checking for this case is a trade-off as it costs an extra read; given that we have assumed that we are in an optimistic regime, we make the trade-off in favor of this case being rare and just always use the cached copy.

Register objects for writing/creating/deleting when requested

For every object X that the client wants to put to the database the client calls dt.put(X) (or create or delete) instead of the usual, say, db.put(X) or X.put(). We imitate the behavior of the App Engine database though caching the results rather than writing them to the database. This operates as follows.

  1. Write X into the cache.

    1. If X is being deleted, cache a None.

  2. Also cache the write operation:

    1. If X is being put, cache "put".

    2. If X is being deleted, cache "delete".

Note that caching a delete entry for X_key means that if the object is read again, None is returned. That is, this means the fact that the object was deleted is cached, and not that the object is deleted from the cache.

When client is done flush the cache

When the client function is done flush the cache as follows.

For each object X in the version number cache, do as follows.

  1. Append X.key() to dt.get_list_obj.

  2. Append X.version to dt.get_list_version. If there was no object, it has version "None".

For each object X flagged "put" or "delete" in the write operation cache, do as follows.

  1. Make a new shadow object in the same entity group as X.

    • If X is flagged "put" copy the state of X into a new instance of class Shadow.

    • If X is flagged "delete" write an instance of the special class ShadowDelete which has no state.

  2. Set shadow.dist_trans = dt.key().

  3. Put the shadow object into the database (now the shadow has a key).

  4. Append X.key() to dt.put_list_obj.

    • Append None instead if X is being created (has no key());

  5. Append shadow.key() to dt.put_list_shadow.

Note that we need two classes for shadows: a class Shadow that holds the state of a put object, and a class ShadowDelete (possibly inheriting from class Shadow) having no state that indicates that the user object is to be deleted.

Note that a Local Transaction is not needed for creating shadow objects.

Note that at least at this layer of the database, copying the state of the user object to a new shadow object (rather than somehow cleverly reusing objects) is unavoidable. First, if the client function reads objects it has already written out of the cache, it expects the object to have the usual class, not class "Shadow". Second, we cannot write shadow objects into the database with the same class as the user object because then queries for the user objects could find the shadows. Those with access to lower layers of GAE can perhaps do an optimization and simply mutate the "Kind" of the object to Shadow and then back again.

Optimization: if the whole DT reads and writes objects all in one entity group, do everything in a single LT. Don't even bother to write shadows or get write locks; just check for the absence of write locks on both objects written and read, check the read version numbers for read objects, and then write straight to the objects. Instead of transitioning the DT in 1ready, go straight to 4done. If write locks are found on objects being written (but not read) then revert to the usual algorithm.

Optimization: When writing out the buffered-up shadows, sort them in the way they are sorted for write locking and write them out grouped by entity group. Ryan Barrett [Barrett]: "as long as they're in the same single put() or delete() call, they'll be grouped by entity group. this is definitely a significant optimization when you need to write to multiple entities in the same entity group."

Optimization: When writing out the shadows, also read in the objects and get write locks on them in the same pass; note that this now requires using an LT. If getting locks blocks, then revert to simply writing out the shadows from then on until the end of the pass. Record the transition point in the DT when we do the write transitioning to 1ready; resume getting locks from that point during "Lock written objects"; if we manage to get all the locks then write None as the resume point.

Best possible scenario: Note that the above optimization can be combined with the optimization below for objects that were both read and written where we do the read checking at the same time as getting the write locks. It is therefore possible in situations with no other conflicts to do all the preparation work in a single pass over the objects before completing writes.

Go to mode 1ready

This is a special case of section "Transitioning to a new mode" below; we repeat it here for clarity.

When writing out the shadow objects is done, in a Local Transaction:

  1. Get the dt from the database. It should be in mode 0init.

    • If not, go into 3aborting: there is an obscure case involving huge clock skew where the garbage collection thread can accidentally have aborted this DT.

  2. Set dt.mode to 1ready.

  3. Save dt to the database.

Once the DT gets to mode 1ready timeouts are no longer relevant: future DT roll-forwards will eventually complete this DT one way or the other.

If we do not get to this point due to a timeout then the timeout handling code will clean up the shadow objects and notify the user that the DT failed when it reaches mode 4aborted.

This is exactly the second time the DT is written into the database; the first being when it was created in mode 0init. No user objects in the database have been written yet.

Algorithm detail: Commit

Lock written objects in order

The deadlock avoidance algorithm requires us to get the locks in an order consistent with some global order (see section "Ongoing progress: No deadlock"). Any global order will do, so we use the object keys.

  1. Sort sort put_list_obj by their keys and permute put_list_shadow to match.

Note that the key sort order should group objects by their entity group [Keys, DatastoreTalk]: (Wilkerson) "When I sort objects by their keys, that also sorts them by their entity group as that is the first part of the key, right?"; Ryan Barrett [Barrett]:"correct." In any case if it did not, change it until it does by sorting by entity group and then by the other parts of the key.

For each object key X_key in put_list_obj and its corresponding shadow key S_key in put_list_shadow, in sorted order, get a write lock on object X as follows.

If there is no shadow object S corresponding S_key, then the write has already been done and this thread is way behind another thread rolling forward the same DT which is already at least in 3checked or 3aborting. Go to "Dispatching on mode".

If X_key is None, the shadow is an object being created with a generated ID; there is nothing to lock.

Otherwise, do the following in a Local Transaction:

  1. Get X from db.get(X_key).

  2. If it does not exist and if X_key is a name key (instead of a numeric id [Keys, KeyClass]; see section "Namespace management") get the corresponding pure meta-data object and use it instead; see section "Persistent meta-data for names".

  3. If it is not DT-flavored, abort the DT.

  4. If X.write_lock is None:

    1. Set it to dt.key().

    2. Put X.

  5. Otherwise, if X.write_lock is dt.key(), we already hold the lock, so do nothing.

  6. Otherwise X.write_lock points at a blocking DT; suspend operation on this DT and roll forward the blocking DT, as follows.

    1. Exit the Local Transaction.

    2. Roll forward that other DT.

    3. Try again to get the lock.

Transition into mode 2locked. Optimization: this transition exists only for the proof of Isolation and writing the DT into the data base can be elided.

Optimization: When getting write locks it is not strictly necessary to group the object interactions by entity group into one LT, but given the underlying implementation of LTs in GAE, the performance will be better if you do. (Wilkerson) "If I group a bunch of object gets together into one get call, that is faster right?" Ryan Barrett [Barrett]: "right!"

Optimization: if an object is read and written, check the version number in same Local Transaction as getting the write lock. (Once we have the write lock, no other DT can have it and therefore no other DT can write it and change the version number.)

We must not combine the two passes of getting write locks and checking reads into one pass, as if a read is checked before a write lock is taken, Isolation can fail; see the example in section "The algorithm is tight / Locking and checking". Therefore grouping getting write locks with checking read locks on two different objects in the same entity group into one LT is not correct. However we can do the reads, say, in reverse sort order, combining the last entity group of the locking pass with the first entity group of the read-checking pass into one LT even if the particular objects checked and read in that LT differ.

Check read objects

For each object key X_key, where

check it as follows.

  1. Get X from db.get(X_key).

  2. If X.write_lock is non-empty and not dt.key(), then abort.

  3. If X.version does not equal the corresponding member of dt.get_list_version, then abort.

Transition into mode 3checked.

Note that None is a version number for a non-existent object.

Optimization: When checking read versions it is not strictly necessary to group the object interactions by entity group into one LT, but given the underlying implementation of LTs in GAE, the performance will be better if you do. We already sort the written objects, but to facilitate this optimization it is likely easiest to also sort the read objects.

Note that we only test version numbers for equality, so as long as DT keys are never re-used correct semantics is ensured: that is, version "numbers" cannot roll-over.

One might contemplate the possibility of the following scenario: one DT reads and records that there is no object and then is suspended; another DT then creates it, then another DT deletes it, then the first resumes, does the version number check which again finds no object, so the check passes and the first DT continues.

See section "Namespace management".

Complete writes by entity group

Completing writes should accomplish the following for each object that the DT has written. Recall that complete writes can be run in either mode 3checked or mode 3aborting.

  1. Monotonicity: If there is no shadow object corresponding to shadow_key then there is nothing to do for its corresponding user object; otherwise, ensure the below properties.

  2. If in mode 3checked:

    1. Ensure the user object reflects the shadow object state even if we have to create or destroy it.

    2. If an object remains, ensure its version is set to dt.key().

  3. Ensure this DT does not hold the write lock.

  4. Ensure the deletion the shadow object.

  5. Ensure for named objects in the case of a creation or a deletion that the meta-data handoff is accomplished between the user objects and its corresponding pure meta-data object and that the pure meta-data object is created or deleted as appropriate.

There are many cases so we discuss in detail how to actually accomplish the goals above.

Note that it is critical to the correctness of allowing LT reads to mix with DTs that the "Complete writes" stage be done by grouping all interaction with one entity group into one LT. This requirement to perform operations grouped maximally by entity groups is in contrast to the other places where we suggest grouping operations by entity group for the purposes of optimization. Note further that if the DT aborted in 0init then the put_list_obj may not be sorted. Therefore sort the objects by their keys; see section "Algorithm detail: Commit / Lock written objects in order" for more discussion on this sorting ensuring grouping by entity group.

For each

do as below depending on the dt.mode.

When done if in mode 3checked transition to mode 4done, otherwise if in mode 3aborting transition to mode 4aborted.

Mode 3checked

If dt.mode is 3checked, for each object do the following:

  1. Get the shadow object from db.get(shadow_key).

  2. Monotonicity: If there is no shadow object in the database corresponding to shadow_key, this write has already been done, so continue to the next iteration of the loop.

  3. If X_key is None, this is a creation.

    1. Create the target object X from the state stored in the shadow.

    2. Set X.version to dt.key().

    3. Put X into the database.

  4. Otherwise, if the shadow is an instance of ShadowDelete, this is a deletion.

    1. If X_key is a name key (instead of a numeric id [Keys, KeyClass]; see section "Namespace management") create the corresponding pure meta-data object for X using state as if we were updating X rather than deleting X:

      • Set write_lock to None.

      • Set version to dt.key().

      • Put the pure meta-data object into the database.

    2. Delete the target object, X. This also deletes its write lock.

  5. Otherwise, this is an update.

    1. Get X from db.get(X_key).

      • If get returns None and X_key is a name key, check for a corresponding pure meta-data object. If there is none, this is a creation; proceed as in the creation case above. Otherwise, if one exists this is a re-creation of a named object, so copy over its meta-data in to X and delete the pure meta-data object (also deleting the write lock). From now on, this proceeds as an update to X as follows.

    2. X.write_lock should be the key of this DT; set it to None.

    3. Copy the state of the shadow object to X.

    4. Set X.version to dt.key().

    5. Put X into the database.

  6. Delete the shadow object.

Mode 3aborting

If dt.mode is 3aborting, for each object do the following.

  1. Get the shadow object from db.get(shadow_key).

  2. Monotonicity: If there is no shadow object in the database corresponding to shadow_key, the write on this object has already been aborted, so continue to the next iteration of the loop.

  3. If X_key is None, this is an aborted creation: there is no user object and no lock on it to delete.

  4. Otherwise, this is an aborted deletion or put.

    1. Get X from db.get(X_key).

    2. X.write_lock must be the key of this DT or None. Set the write lock to None if it points at this DT; do not change the write lock if it points to another DT.

    3. Put X into the database.

  5. Delete the shadow object.

Cross-cutting concerns

Use of local GAE Local Transactions

Here we summarize all of the places in the Distributed Transaction algorithm where we use the built-in GAE Local Transactions in the core algorithm. We also give which object is the intended target of the Local Transaction (implicit by the operation, but we include it for clarity). In the algorithm such points are emphasized by the presence of "in a Local Transaction" in bold.

Note that each of the above has to re-read the object from the database on which it is performing an action in the Local Transaction and then write it back again.

Note there is no need to use Local Transactions when performing "Check read objects".

Transitioning to a new mode

The algorithm transitions to a new mode when it has completed or aborted a previous one and is about to start a new mode. What is important is that we enforce that we are not skipping a step and not leaving the DAG of legal transitions (see section "Mode Directed Acyclic Graph").

In this situation there are three modes:

Check that the transition from current mode to new mode is part of the legal transition DAG. Then, to transition from the current mode to a new mode, in a Local Transaction do the following.

  1. Get the dt object. Since other threads may be rolling forward this DT, if we read the DT from the database and it no longer exists, there is nothing further to do on this DT.

  2. If dt.mode != current mode then exit the Local Transaction and go to "Dispatching on mode".

  3. Otherwise, if new mode is "Deleted" (that is, the mode transition is to delete the DT object):

    1. Delete the DT.

  4. Otherwise:

    1. Set dt.mode to new mode.

    2. Put dt into the database.

After the local transaction is done, go to "Dispatching on mode".

Note that as a consequence one thread cannot abort a DT that another thread has already transitioned into 3checked: the thread that wants to abort will read the DT, find it is in 3checked, discard the new mode it was planning on going into, namely 3aborting, and continue to roll the DT forward in 3checked.

Dispatching on mode

Rolling forward a blocking DT

There are two places where we attempt to interact with an object which may already have a lock on it, forcing us to block.

If this happens we must suspend this DT until that lock is released. In order to prevent blocking indefinitely, we use this thread to roll forward the blocking DT. This is ok as the whole process is thread-safe and deadlock-free.

In order to prevent the inefficient contention and duplication of work caused by multiple threads rolling forward the same DT, if the blocking DT has a modification time that is less than TIMEOUT, then we wait instead of rolling the blocking DT forward. See "A note on time" for more about the function of TIMEOUT and choosing a good one.

Aborting

If a DT does not complete in state 4done then it must abort, going through 3aborting to clean up its state and then ending up in 4aborted.

During 0init

A DT can fail and need to be aborted while in mode 0init.

If either of these happen the exception handler or soft timeout handler does the following.

  1. Write the DT to the database in mode 3aborting. Note that since this thread is the creating thread and we are still in mode 0init, no Local Transaction or further checking is needed here (unlike below). Note also that this writes the put_list_shadow into the database.

  2. Roll that mode forward by going to "Dispatching on mode".

After 0init

A DT can abort during "Check read objects" if a read object fails a check. A DT cannot abort after reaching 3checked. Aborting is a special case of a mode transition.

  1. Exit any current Local Transaction.

  2. Transition into mode 3aborting using section "Transitioning to a new mode" above. Note that the protocol in that section prevents a DT in mode 3checked from being aborted.

  3. Go to "Dispatching on mode" which should go to section "Completing writes".

Note that when run in mode 3aborting instead of 3checked, section "Complete writes" does not actually copy the state of the shadow objects to that of the user objects, however it does still delete write locks and shadow objects.

Handling being timed out

When a request times out in App Engine that there is first a soft timeout [ExceptionsGAE] and then a hard timeout [RequestTimer]. Ryan Barrett [Barrett]: "[I] think you get 1s btw the soft and hard timeouts, but i'd budget for a lot less. still, you should have enough time to do a few gets and puts".

The soft timeout exception can be caught and should do the following:

  1. Handle potential changes to the DT mode.

    • If the DT mode is 0init, put it into 3aborting as detailed in the section "Aborting during 0init". Do not right now attempt to roll forward the DT to finish the abort procedure and delete the DT and its shadows, as we want to get to the redirect step below.

    • Otherwise if the mode is past 0init, there is nothing to do here.

  2. Throw a soft timeout exception to the app.

    1. The app should redirect the user to a "Still working…" view. It would be best if the client would automatically reload, but if it doesn't then whenever the user interacts again with the app it will query for any operations initiated by the user and roll them forward.

    2. When rolled forward in the case of an abort it will finish the abort procedure and in other cases will continue the roll-forward.

Handling hard timeouts in a background thread

Both the soft timeout handler and the exception handler can experience a hard timeout, leaving a DT in the database. If this DT is in mode 0init, then there may be several shadow objects pointing at it without the put_list_shadow of the DT pointing back at them.

Background processes will soon be possible in Google App Engine [OfflineProcessingTalk]. A background thread cleans up this situation:

Note that the correctness of this algorithm does not depend on bounded clock skew. While the notion of the age of a DT used above assumes an upper bound on clock skew, a suspicious idea at best in a distributed machine, the worst consequence of such skew is reduced performance, aborting DTs that could otherwise have succeeded. See section "A note on time" for the function of TIMEOUT and choosing a good one and for more discussion of clock skew.

The third point of the algorithm above is in fact necessary, as follows. Due to the fact that GAE queries can fail to return objects that satisfy their predicate [TransIsolGAE] it is possible that the query will miss a shadow object. The thread that was adding these shadow objects should be gone, but if there is huge clock skew, it might still be operating and a shadow could be missed by the query and garbage of these "orphaned" shadows could accumulate. The combination of huge clock skew and a query missing an object is likely to be quite rare, but strictly speaking this is a fundamental problem that cannot be easily removed. The only solution I see is that given in the third point above: put a creation time field onto shadows and have yet another background query that looks for very old shadow objects and checks if their DT is gone and deletes them if they are. (Note that there is no way to run a query for shadows the dist_trans field of which points at a deleted DT.) If queries ever become reliable, this third pass and the creation date field can be removed from shadow objects.

A note on time

In two places we use timeouts as a heuristic to prevent the inefficiency of multiple threads pushing forward the same DT.

Ryan Barrett [Barrett]: "[T]here's no upper bound on the clock skew between machines. :/" and "[W]e don't make any guarantees about clock skew. in practice, expect just a few seconds or so. maybe up to 10, but that'd probably be unusual". As you can see, were unable to get an upper-bound on the possible clock-skew in the Google App Engine infrastructure from the Google App Engine team. Without such a bound, there can be no absolutely reliable global notion of time and therefore correctness cannot depend on it. No distributed algorithm should depend on a global notion of time for correctness and this algorithm does not.

Tuning the wait time

Note that TIMEOUTS should be tuned carefully. For example, when rolling forward other DTs that have a lock we need, as an optimization wait until the other DT's modified field is at least a little old before rolling it forward so as to prevent many threads from all doing this roll-forward at once, which is simply wasteful.

Interaction of LTs and DTs

An object that participates in a Distributed Transaction, a "DT-flavored" object, must no longer be written (created/updated/deleted) by client-controlled Local Transactions as those might not honor the DT protocol.

The namespace of named objects must be partitioned into DT-flavored names and LT-flavored names as the identity of a DT-flavored named object persists across deletion and re-creation and (client controlled) LTs need not honor the mechanism for enforcing that; see section "Namespace management".

Since queries do not honor Local Transactions, and therefore also neither Distributed Transactions, an exception should be thrown if a query ever returns a DT-flavored object.

Reading DT-flavored objects in an LT is ok since DT copy shadows operations are grouped into a single LT per entity group. While LTs deletes do not mix with DTs, deleting a DT in a final mode (4done or 4aborted) or in None mode that has expired (see section "Best-effort temporary read locks") is an LT write that can interact with DTs; this is a special case of "Transitioning to a new mode".

Enforcing flavors

In order to enforce that LT writes do not mix with DT operations in an illegal way, every user object in the system is either LT-flavored or DT-flavored.

Note that if one were to attempt to extend this system to allow an object to change flavors one must be mindful of very subtle corner conditions that arise at the conjunction of flavor change and creation/deletion and the conjunction of flavor change and interaction with multiple transactions of different flavors. Such an attempt is not recommended.

Namespace management

Technically there is no "create" operation in App Engine; there are "get", "put", and "delete". That seems oddly asymmetric because it is. There are basically two good regimes for namespace and object management, as follows.

  1. Service controls the namespace. This is the way C++ manages objects: new/delete for creating and deleting objects or "context", read/write for creating/deleting data or "content".

  2. Client controls the namespace. This is the way Perl manages objects, which they call "auto-vivification": you only have "put" and "get" and the namespace is somehow "eternal" and managed for you. If you put an object, it is there. If you get something, it is there. You never have to create or delete anything.

App Engine does a weird hybrid of the two and can act as either depending on usage [Keys].

  1. If you allow keys to be auto-generated then it is (1) above, except where you must tell if you are doing a create or a write only by looking at the object and checking if it has a key already or not before you put it.

  2. If you use user name keys then you are using (2) except that there is no garbage collection thread running in the background so you have to manually delete objects.

Note that in App Engine you can tell whether an object has a client-chosen name key or a service-chosen numeric ID simply by checking if the first character of the key is alpha or numeric. Keys can be retrieved from their objects [KeyMethodOfModel]. A key is not a string, but an object that can be losslessly interconverted to and from a string; you can tell whether a key is named or not by asking the key form of the object [KeyClass].

Names as fields of the entity group

The DT algorithm as stated was designed for idiom (1): letting the datastore pick the ids for objects; however in the interest of minimizing our impact on the usage of App Engine, we can to allow idiom (2): letting the app pick the names of objects. This is a tricky problem because once the user can pick the object name it has an existence independent of the object. The situation is as if the entity group were an object and the name a field on that object. As we noticed before, you can't mix LT writes with DT operations on an object, but by allowing the user to create named objects in both LTs and DTs, we are allowing the mixing LT writes and DT writes on the entity group "object".

Consider the following example:

  1. DT1 reads object named "foo", finds that it does not exist.

  2. DT1 writes foo, but then is suspended before it can commit this change.

  3. DT2 creates object "foo" and commits.

  4. DT3 deletes "foo" and commits.

  5. DT1 wakes up and commits "foo" and possibly some other related changes.

If foo had been being read and written rather than being created and deleted, this would clearly violate isolation. However, we do not detect it because we are creating and deleting objects instead of reading and writing them. By having a name "foo" we create the sense that all of the objects called "foo" have some semantic continuity which is being violated here. Recall that this example is not possible using generated IDs as creating an object with a generated ID is guaranteed to pick an ID that has never been used before: we cannot delete and then "re-create" an object with an ID as we did with named object "foo" above.

We suggest that the transactional semantics that will make sense to most people is that named objects always have a value, even if that value is "object does not exist"; just as with objects, this value has a version number, a write lock, and a flavor. In the example above, a version number check by DT1 in the last step would have found that object "foo" had been written since DT1 read it and DT1 would have aborted.

However, we do not want to force the use of the Distributed Transactions system on applications that want to operate on named objects using Local Transactions. As noted above in section "Enforcing flavors" mixing DT and LT flavored interaction on named objects results in situations with very subtle semantics and therefore we simply avoid it.

Doing so partitions the namespace in a permanent and unambiguous way.

Persistent meta-data for DT-flavored names

We create persistent meta-data for names as follows.

We adopt a similar restriction as App Engine does on the Name part of an object key [Keys]: application writers may not use Kinds that start with two underscores. We add the convention for others building infrastructures to co-exist with ours that for names starting with double-underscores the part of the namespace after the first non-leading double-underscore is infrastructure meta-data. Further we claim the subspace that starts with "DT".

Note that if we could use the exact same key as the user object, with the same Kind part as well, then everything would be much simpler. In particular, we might be able to allow LT-create and DT-create to mix. However it wouldn't work because queries would find these pure meta-data objects as if they were the actual objects: they would pollute the query results. Therefore we must create pure meta-data objects as a convention of the DT infrastructure; since LTs are not required to honor this convention, we must prohibit LTs from creating named objects having names in the "__DT_"-prefixed namespace, as we stated above.

Application idioms

Additional idioms beyond the core DT algorithm are required in order for the application to provide transactional semantics all the way out to the user interface. We suggest some idioms and ensure that the DT algorithm admits of them.

Providing (user-) synchronous local sequential distributed-consistency / "strong" consistency

In GAE Local Transactions ensure that each entity group exhibits data-sequential distributed-consistency [seq-con]. We preserve this property by completing writes to the database within entity-group granularity Local Transactions.

DTs are synchronous issue but asynchronous complete, allowing for the possibility that DTs will complete in an order other than that in which they were issued. While the user can likely tolerate this to some degree, were it to become severe it would be annoying. Therefore we want to preserve the user-local sense of time as much as we can. That is, we want user-synchronous order: we attempt to complete DTs in the order they were requested.

That is, the distributed-consistency can be made user-synchronous in two different ways: one best-effort and the other guaranteed. The idioms that provide these properties vary in other ways such as in their performance impact and therefore the choice between them is a trade-off.

Best-effort

At the start of processing a request the app should first query for any pending DTs by the requesting user and then attempt to roll them forward to completion them before initiating any further activity on behalf of that user. We allow for this idiom but it is up to the app to do it. Using this idiom the completion order is best-effort because other threads may be rolling forward incomplete DTs in parallel that were initiated by the same user.

Guaranteed

A more heavyweight approach can guarantee that the DTs issued by a user complete in the order issued as follows. Maintain a list of pending DTs issued by a user in their User object: when a DT is created, append it to the end list and when a DT is registered as complete (see below) delete it from this list. All threads wishing to roll forward a DT must look up its user object and always roll forward the first DT on the list that is not in state 4done or 4aborted. Always interact with the User object in a Local Transaction. Put all DTs created by a user into the same entity group as its User object; create the DT object and append it to the pending DT list in the user object in the same LT. This list of pending transactions can be presented to the user along with their current state and final result.

A potential problem is that the user may issue several requests which could potentially be completed independently. If the one registered to be completed first is slow, this will cause the others to block, and in general reduce parallelism. This may be more frustrating for the user than asynchronous completion of DTs.

Note that using guaranteed user-synchronous distributed-consistency is equivalent to providing "strong (distributed-) consistency" or (user-) synchronous local (data-) sequential distributed-consistency to the layer above the DT layer.

Making completion state visible even in the presence of asynchrony

If the distributed_run_in_transaction function manages to return synchronously within the thread that initiated it then it should return the DT. However, in the presence of timeouts, DT completion can be asynchronous. Nonetheless we need the completion state of DTs to always be visible to the app. Therefore when DTs complete we need to keep them around in the database to give the app a chance to acknowledge the completion and reflect it in the user-visible state.

Once the app has acknowledged the change, it can delete the DT; it should only delete DTs that are in states 4aborted or 4done. Note that deleting the DT is a special case of "Transitioning to a new mode" and therefore should be done in a Local Transaction; see the notes on deleting a DT in that section. This may seem like overkill but deleting the DT and reporting its final state to the user should be one action; doing this in an LT prevents odd non-transactional effects in the UI such as, say, the double-reporting of the results of the DT to the user. Of course transactional semantics of such reporting cannot be guaranteed: the app could delete the DT and timeout before reporting the results to the user (or the reverse if done in the reverse order). In any case, transactional semantics in the database is still guaranteed.

Some applications are likely to be sufficiently complex that it should push to a window somewhere in the UI a list of all DTs, what the function and arguments were, and whether they are in process, completed, or aborted. Thus upon completion in 4aborted or 4done, the DT should contain the following information:

Note that annotating the client function onto the DT object should be done at DT object construction time, annotating the return value of the client should be done in the same LT as the mode transition into mode 1ready, and annotating the reason for the abort should be done in the same LT as the mode transition into mode 3aborting.

Useful user display modes

The user is part of the computation. We want to ensure the maintenance of invariants not only within the database but between the state of the database and the state of the user. That is, in an important application, a user taking an action based upon state data in their UI could result in disaster. We provide idioms to address this problem. There seem to be only a few good points in the tradeoff space between performance and the maintenance of invariants across the user and database.

Transactional interaction with the user

We want to interact with the user, both when reading and writing, in a way that has the ACID properties. That is, we do not want the user seeing a screen of data that is not an consistent and isolated snapshot of the database nor taking actions based on stale data. We therefore suggest the following idiom.

Recall that this works because an LT read interacts with DT writes transactionally, per the section "Interaction of LTs and DTs".

To read from user in a DT:

Alternative returning the rendered page

Alternatively, it may be more efficient for the client function to

  1. Copy only the object keys and their version numbers, rather than the entire object state, to the new object in the user-page entity group.

  2. Construct the page (say in HTML) to render to the user and return it as the function return value in the DT.

  3. Render that return value to the user when the DT completes.

Note that either buffering the objects to be rendered or buffering a copy of the page (as opposed to pipelining the rendering straight out the socket) is an unavoidable consequence of enforcing transactional semantics. It's just a question of which buffering is more efficient.

Combining with best-effort temporary read locks below

The most elegant way to execute this technique in combination with best-effort temporary read locks below is as follows.

Note that we no longer need to copy of the read objects and their version numbers to a user-page object.

Best-effort temporary read locks

Our DT algorithm guarantees transactional semantics. However, being optimistic, a DT is vulnerable on its read side: if a second DT writes one of its read objects before the first can get to 3checked it will be forced to abort. To the user this shows up as the phenomenon that under high contention there can be many aborted DTs possibly resulting in livelock [livelock].

Read lock mechanism

There is likely no one-size-fits-all solution to this problem, however providing a mechanism that allows various apps using the same database to at least attempt to cooperate may help. We therefore suggest a simple temporary read locking mechanism as follows.

When desiring a read lock on a set of objects create a DT

  1. Preemptively populate the get_list_obj and get_list_version with the object keys and their versions respectively.

  2. Set the mode to "None" (instead of say 0init).

  3. Add a flag read_lock set to true.

  4. Add a timestamp timeout field set to the current time plus whatever timeout is desired, say one minute.

  5. Put the DT into the database.

This DT can be kept around and used later to issue a distributed_run_in_transaction as long as it has not expired.

Modify the existing algorithm

We must modify the current algorithm as follows. When attempting to obtain a write lock on an object do the following:

  1. Query for any DTs that

    1. have a read_lock flag,

    2. list the object in their get_list_obj, and

    3. have a timeout later than now.

  2. If the response is non-empty, block until the timeout.

An expired read lock is a DT that has mode "None" and a timeout in the past. Most of these read-lock DTs can be eliminated more efficiently as follows.

There is a race between transitioning a read-lock DT in mode None into mode 0init versus garbage collecting it by deleting it. Therefore doing either must be done in a Local Transaction; this is a special case of section "Transitioning to a new mode". Note that this is different from simply creating a DT object in mode 0init when it is not transitioning from having been a read-lock DT: that can be done without a Local Transaction.

If read locks fail, transactional properties still hold

Note that

That is, temporary read locks are just a best-effort layer on top of the mechanism which already guarantees the transactional properties.

Write collision merge tool

Many distributed collaboration systems have found that having a transaction result in an all-or-nothing commit or abort is too annoying for users. Frequently a set of changes to one part of the database is basically independent of that of another set of changes made by another user, but they may technically overlap in some unimportant ways. Thus many collaborate systems provide a merge tool for the user when writes collide; version control systems are one such. Simon suggests that when changes collide, show the user:

Then let the user do the merge. This however is an application-level concern. A more complete analysis of the semantic-consistency property would state this as a property of the system that we then ensure; however, stating correct merge semantics is beyond the scope of this article. We state it for completeness as another use case of the idiom we use of keeping aborted DTs around for post-mortem analysis by the app discussed in section "Making completion state visible even in the presence of asynchrony".

Analysis

We attempt to ensure the algorithm above has the properties we wanted.

Assumptions about the GAE / Big Table infrastructure

Critical: We assume that a root key having a generated numeric ID (as opposed to a name key) will never be reused. Ryan Barrett [Barrett]: "[R]oot, id-based entities of the same kind will always have unique ids[.]" [IDsOfDel, DatastoreTalk, Keys]

Critical: If you do gets and puts not within a run_in_transaction then each one of them is implicitly wrapped in its own run_in_transaction under the hood [Barrett, Armbrust]. That is, there are no gets or puts done not within at least a Local Transaction.

Critical: As LTs need not honor the conventions of the DT layer, certain combinations of operations between DTs and LTs are not allowed, such as mixing LT writes with DTs. Enforcing such restrictions cannot be done if the user is allowed to use the LT layer directly. Erick Armbrust [Armbrust] suggests simply reflecting an interface to the LT layer into the DT layer which delegates to the LT layer but implements the required restrictions and then telling the user to not even import the LT layer. See section "Interaction of LTs and DTs".

Performance: We assume that the GAE thread timeout will never exceed our TIMEOUT value, including skew between machines. Correctness does not depend on this assumption, however if it is wrong, some DTs may be aggressively aborted that would not otherwise have been and multiple threads may end up wastefully pushing forward the same DT.

Objectification

GAE on top of Big Table provides an object abstraction. Further objects are loaded and stored atomically and GAE provides Local Transactions for manipulating local groups of objects in more complex ways.

Uniqueness

This property may sound simple, but an earlier version of my design violated it! We want the fact that puts do not happen until the end of the DT to not be able to enter into the dataflow of the application. Here is an example:

This cannot happen in our design as all accessed objects are imported into the in-memory cache which prevents the client from seeing more than one version of an object at a time.

Durability

We complete a DT after all the changes have been written to the user objects. Durability of DTs follows from the Durability of the underlying Local Transaction layer.

Atomicity

A DT can abort only under the following circumstances:

Recall that during mode 0init when the client is doing writes, all writes are made to shadow objects, not user objects. In either case above since the DT does not make it to mode 3checked all writes are abandoned.

Once in mode 3checked the DT can no longer abort. During copying a Local Transaction can fail but this is only a transient failure; we simply retry until success.

See section "No starvation due to DTs being abandoned" for details on why a DT cannot languish indefinitely without being rolled forward to completion or being aborted.

Isolation

We copy over all changes to the database from the shadow objects only after all of the objects to be affected have been locked and we only release the locks after the copying is done. Therefore other changes cannot interleave.

However, the changes we make are not against the current version of the objects, they are against the version we read initially. Therefore the last step of acquiring locks before copying includes a step to check that the object version we read is still the current version.

The following example is not a proof, but it captures why this works: DT1 has write locks on a and b. DT1 had written a but not b. DT2 reads a and b. As Simon says, we want to make sure at this point that DT2 is doomed.

See the full proof below in the section "Full proof of Isolation".

Semantic-Consistency

Using DTs correctly to jump from only to good states is an application-level concern and is beyond the scope of the transactional layer; however as it occurs as the "C" of ACID we state it for completeness. A machine-checked assurance of this property would likely take the form of a static analysis showing that for each function run in a DT if the invariants hold when it starts and it runs to completion then they hold when it ends.

Local Sequential Distributed-Consistency

In GAE Local Transactions ensure that each entity group exhibits sequential distributed-consistency [seq-con]. We preserve this property by completing writes to the database within entity-group granularity Local Transactions.

Further, in section "Application idioms" we suggest idioms that provide either best-effort or guaranteed (user-) synchronous local sequential distributed-consistency.

Thread safety

In mode 0init actions on a DT are single-threaded. The DT can hold no locks, so no other thread will attempt to roll it forward. If a timed-out instance of a DT in mode 0init is found by another thread that thread can only abort the DT.

After mode 0init, multiple threads can be operating on the same DT. Note therefore that the sequence of states of the DT must have the following properties.

These properties guarantee thread safety; see the full proof below in the section "Full proof of thread safety".

Can always make global progress — no deadlock

One DT only waits for another in two situations.

Therefore the wait-for graph cannot have a directed cycle.

Always making local progress — no starvation

Abandonment

DT objects that (1) time out and then (2) are abandoned by their initiating user could in theory languish indefinitely.

Note that no attempt is made to provide any kind of scheduling fairness.

Conflict

DTs can conflict when they both access an object: while reads share, writes exclude.

Ameliorating write storms

In general an optimistic transaction algorithm is vulnerable to write storms. In general there is no help for this problem, but algorithms exist for specific situations. For example, read-then-write operations that combine (such as addition) can be performed in parallel using fan-in trees.

Scheduling such trees gets even easier if the operations in question associate and/or commute. However taking advantage of reordering operations requires DTs to be asynchronous-issue: when requested by the user the operations should be added to a queue and run later by a background process.

All long-term storage resources are semantic — no garbage

The only non-user data created in the database are shadow objects, DT objects, and write locks.

See section "No starvation due to DTs being abandoned" for details on why a DT cannot languish indefinitely without being rolled forward to either 4done or 4aborted. A DT in this state is semantic in that it communicates to the app that the completion of this DT has not been yet acknowledged. The suggested idiom is that a DT object is deleted by the app after it reaches 4done or 4aborted and is acknowledged by the app.

All shadow objects are attached to the DT that created them and normally can be found from it through the put_list_shadow field. Whether the DT completes normally or is aborted, except in the presence of a hard timeout during mode 0init, its shadow objects are deleted during "Complete writes" along with their corresponding write locks.

In the presence of a hard timeout during 0init, the situation is more complex; see "Handling hard timeouts in a background thread" for the details.

DT object keys are never re-used. This is the only kind of garbage that accumulates: unusable keys. Keys of DTs must therefore forever grow longer.

Preservation of read-only-ness — no gratuitous writing

By inspection it is clear that our locking protocol only writes user objects when that object is written by the client.

Full proof of thread safety

Definition: Let the entire state of a DT be the product of the states of

Definition: Let the correct entire state transition graph be a non-deterministic directed acyclic graph on the set of possible states of a DT given by the non-deterministic union of all possible entire state transcripts of a single-threaded lifetime of a DT. By "non-deterministic union" we mean that if two possible transitions can occur, both are included in the graph. Note that the legal mode transition DAG is a subset of this graph as the modes are a subset of the state.

Definition: An algorithm is thread-safe if when executed in the multi-threaded case the database state of the DT only makes transitions that lie within the correct entire state transition graph. That is, the multi-threaded version of the algorithm behaves as if it were a single-threaded version.

Lemma: once a DT makes a mode transition it cannot be unmade

The section "Transitioning to a new mode" a transition has the following properties:

One important consequence of atomicity is that there can be no confusion between two threads operating on the same DT as to whether a DT is aborting or not.

Corollary: No continuing a DT in mode 3aborting or 4aborted. An aborted DT cannot go on to further non-aborting modes, such as 3checked: when the mode transition is attempted, the transitioning thread will find mode 3aborting or 4aborted.

Corollary: No aborting a DT in mode 3checked. In particular once a thread has transitioned a DT into mode 3checked, if a second thread rolling forward the same DT later repeats some of the read-checking and finds a user object is now write-locked or has a new version number, that second thread cannot now abort the DT: when it attempts to transition into 3aborting, it will be stopped by the fact that the DT is now in 3checked.

Theorem: The algorithm is thread safe

Mode 0init is thread safe because of the following.

States having modes subsequent to 0init must be considered to be multi-threaded as

In these modes, the client is finished and so its state is no longer relevant. The remaining entire state consists of the state of

The "strong" distributed-consistency property of the underlying Local Transactions means that the entire state (that of the DT object and of the client objects) may be coordinated by a thread: the changes made by that thread must show up in the database objects in the same order as the Local Transactions in which the thread issues them. This sentence may be so obvious as to be confusing if you have never contemplated underlying distributed-consistency properties weaker than "strong" distributed-consistency; see subsection "Strong consistency" of section "Full proof of Isolation" for more.

Multiple threads may therefore roll forward a system without any race conditions as long as the following properties hold.

Note that one way to make a state progression monotonic is to make the changes idempotent, or two-state monotonic: doing the change once is the same as having done it many times — that is, the change simply goes to a particular goal state.

The state transitions of the algorithm are monotonic and atomic

The following list of state transitions is exhaustive and the transitions made are atomic and monotonic.

Full proof of Isolation

Objects have a well-defined identity over time since each object has a unique key. User object states are only updated through DT writes and a DT write annotates the object written with a version number, namely the id of the last DT to write it. Therefore, every user object state corresponds to a unique (object key, version number) pair. A DT reads a set of user object states as input (and possibly takes other input as well) and writes another set of user object states as output. All persistent application computation, that is, changes to the object states, is performed by a DT in this way.

State matrix

To help visualize this space of states, arrange the set of such states in a matrix where all the states of a given object key occur in a column. Since all operations writing user object states occur in a Local Transaction on that object and these Local Transactions can be linearly-ordered (by isolation of Local Transactions), for each column, sort the states vertically top-to-bottom in that order.

If you want you could use this same matrix to visualize the deadlock avoidance algorithm by drawing waits-for edges on it and noticing that there can be no cycles. Recall that for deadlock avoidance object keys are sorted in a global order (the order in which locks are obtained). The simplest visualization would be to sort the columns left-to-right in that order; waits-for edges will always go to the right. However doing this is unnecessary for the Isolation proof.

Synchronous local sequential distributed-consistency / "Strong" consistency

This is a paper on distributed databases. The word "consistency" already means one thing in the context of distributed algorithms and another thing in the context of databases. Above we defined a "semantic-consistency", which is the databases notion of consistency. Here we discuss "distributed-consistency" which is the distributed algorithms notion of consistency.

A Local Transaction is an interaction between one thread and one entity group. (Note that all database interactions are forced to effectively occur in a Local Transaction as an LT is created for operation that does not have one.)

We need that the underlying layer provides "synchronous local" version of "sequential [distributed-] consistency" [seq-con].

To keep the name short, we also call these properties collectively "strong" (distributed-) consistency.

Transitivity of the various kinds of local time

In general, we use "<" to denote "happens before". Recall that in a distributed system time only makes sense in the context of some locality, such as the Local Transaction stream of

Data-Local Sequential (Distributed-) Consistency: When we wish to specify a locality we make it explicit by annotating with the locality: "<(X)" meaning in the local time of object X or "<(DT-A)" meaning in the local time of Distributed Transaction DT-A.

Thread-Local Sequential (Distributed-) Consistency: There is also a local time of a particular thread that is pushing forward the state of the DT; since transitions on the state of a DT are done in an LT the thread state can be finer-grained than the state of the DT but is forced to stay consistent with it. (Only using this finer-grained thread-local time can we reason about the transition into mode 2locked.)

Synchrony (Blocking reads and writes): Using "strong" distributed-consistency of the underlying layer defined above we can combine "<" sentences from the local times of different objects and DTs and conclude new sentences by transitivity of "<".

Enforced inequalities

The algorithm enforces several useful inequalities as follows. Note that we say a DT reads an object X when the first read of X occurs, and the DT writes X when the shadow is copied to the user object.

All-locking-brackets-2locked: Recall that in the local time of any DT we have there is a time when it first enters mode 2locked which we will write "DT 2locked". For all objects X written by that DT:

DT locks X <(DT) DT 2locked <(DT) DT unlocks X.

All-reads-and-checks-bracket-2locked: Further all reading is done before entering mode 2locked.

DT reads X <(DT) DT 2locked.

Below DT-A and DT-B are two distinct DTs that do not subsequently abort and X is an object.

Locks-mutually-exclude: Only one DT can hold the write lock of X at a time, so if DT-A writes before DT-B writes then:

DT-A unlocks X <(X) DT-B locks X.

Locks-exclude-reads-and-checks-after: Writing and unlocking occur in the same LT, so if DT-A writes X before DT-B reads X then:

DT-A unlocks X <(X) DT-B reads X <(X) DT-B checks X.

Locks-exclude-reads-and-checks-before: If DT-B reads X before DT-A wrote X then DT-B may not check after DT-A writes or it will abort during read checking from the updated version number. Further DT-B may not check X between DT-A locking X and DT-A writing X or it will abort during read checking from DT-A's write lock. Therefore if DT-B reads X before DT-A writes X then:

DT-B reads X <(X) DT-B checks X <(X) DT-A locks X.

Lemmas

Lemma write-write: DT-A write X <(X) DT-B write X implies DT-A 2locked < DT-B 2locked.

This follows from All-locking-brackets-2locked and Locks-mutually-exclude:

DT-A 2locked <(DT-A) DT-A unlocks X <(X) DT-B locks X <(DT-B) DT-B 2locked.

Lemma write-read: DT-A write X <(X) DT-B read X implies DT-A 2locked < DT-B 2locked.

This follows from All-locking-brackets-2locked and Locks-exclude-reads-and-checks-after and All-reads-and-checks-bracket-2locked:

DT-A 2locked <(DT-A) DT-A unlocks X <(X) DT-B reads X <(DT-B) DT-B 2locked.

Lemma read-write: DT-A read X <(X) DT-B write X implies DT-A 2locked < DT-B 2locked.

This follows from All-reads-and-checks-bracket-2locked, Locks-exclude-reads-and-checks-before (with A and B reversed), and All-locking-brackets-2locked:

DT-A 2locked <(DT-A) DT-A checks X <(X) DT-B locks X <(DT-B) DT-B 2locked.

Lemma read-read: DT-A read X <(X) DT-B read X implies DT-A 2locked < DT-B 2locked or DT-A and DT-B read the same version number of X.

If DT-A read X <(X) DT-B read X then there are two cases:

(1) If a write on X by some DT-C came between the two reads, then using Lemma read-write and Lemma write-read we have that

DT-A 2locked < DT-C 2locked < DT-B 2locked.

(2) If no write came between the two reads then the two reads were of the same version number of X.

Theorem: DTs are totally-orderable consistent with each local access

The conjunction of all four lemmas is the following:

For all objects X and for all DTs DT-A and DT-B if DT-A accesses X <(X) DT-B accesses X, then

Therefore all such objects X "might as well have been accessed in the same order": that is,

Therefore there exists a partial order on all DTs that is consistent with all local object accesses.

Any partial order can be extended to a total order.

The algorithm is tight

The mechanism provided by the algorithm is sufficient to provide the properties that we want; however, there is also nothing extra: each mechanism is also necessary to accomplish this.

Modes

Any time there is a mode in which the DT can abort there needs to be a mode after that when the danger of aborting has passed. The DT can go into that next mode or 3aborting but never both as mode transitions are conducted in a Local Transaction on the DT object in the data base; this prevents ambiguity between the two threads as to whether it is being rolled forward or aborting. Therefore modes 1ready and 3checked are needed as they follow 0init and 2locked which can abort. Similarly mode 3aborting is needed to definitively initiate an abort.

Modes 4aborted and 4done are needed so the app can examine the DT and know that it is done. Mode 2locked exists for use in the proof of Isolation but there is no need to write it into the database, so we include an optimization where we do not.

We need mode 0init to be distinct from the mode None — the mode of DTs functioning as best-effort read locks waiting to be used — because once in mode 0init the read lock timeout field no longer applies to the DT. The real question is why do we have to put the DT into the database before the shadows are written (when not using best-effort read locks). Consider the following. When the background thread finds an old shadow it must ask if there is a DT pointing to it to know if the shadow is still in use. We don't want to query for DTs that point to it because if that query had a false negative [TransIsolGAE] we would delete a shadow for a possibly active DT. This would be an unrecoverable error: the DT could be in 3checked already copying shadows and not able to be aborted. Therefore we have a dist_trans field on the shadow so we can ask if a particular DT exists, and if not then we know we can delete the shadow. In order to write the dist_trans field onto shadows, we must have a key for its DT object. The only way to do that is to have put the DT into the database before mode 1ready. We might as well call that mode 0init.

Data structures and organization

The created field of the DT is needed so the app can roll forward DTs in the order that the user issued them. The modified field is needed so that when blocked on obtaining a write lock the thread of the blocked DT can guess if any other thread is likely to be rolling forward the blocking DT or not and decided whether to jump in do that itself (if it is old) or wait and avoid duplicate work (if it is not); see the comments on TIMEOUT above in the section "A note on time".

The get_list_obj and get_list_version fields are needed to find the read objects and their versions. The put_list_obj field is needed to find the put objects; put_list_shadow is needed so that the shadow for an object can be found without resorting to a query on the dist_trans field of the shadow. Recall that queries are not allowed in Local Transactions.

Without the shadow objects being in the same entity group as the user object the copy of the state of the shadow object to that of the user object and the deletion of the shadow object could not be done in a Local Transaction.

Without the write lock and version of an object being in the same entity group as the user object the read checking cannot use a Local Transaction to force serialization with

See the discussion on why we need mode 0init above for why we need the dist_trans field on a shadow object. Without the created field on a shadow object the third query for orphaned shadow objects in the same aforementioned section would not be possible.

Without the cache we could not prevent the fact that writes are buffered as shadow objects from showing up to the client function which would be a failure of Uniqueness.

Locking and checking

Without buffering writes in shadow objects, an abort during checking version numbers could leave some writes done without others which would be a failure of Atomicity.

Without sorting the written objects by their keys deadlock could happen when DTs wait on each other for the write locks.

Without write locks writes from different DTs could interleave which would be a failure of Isolation.

Without recording and checking version numbers, other writes could interpose between two reads which would be a failure of Isolation.

Without checking all the reads only after getting all the write locks a second DT could write the read object after the first but read the written object before the first which would be a failure of Isolation. Here is an example. Note that (1) DT-A reads X before DT-B writes X but (2) DT-B reads Y before DT-A writes Y.

DT-A: read X write shadow Y check read X pause
DT-B: read Y write shadow X check read Y lock X copy shadow X unlock X done
DT-A: lock Y copy shadow Y unlock Y done

Without the read check also checking for the absence of a write lock along with a version number reads can interpose between writes, violating Isolation. Here is an example.

Invariant: X + Y = 0. Start: X = 0, Y = 0.
DT-A: read X = 0 / version 1 read Y = 0 / version 2 X := X + 1 Y := Y - 1 write shadow X write shadow Y lock X lock Y check X version — pass check Y version — pass copy shadow X pause
DT-B: read X = 1 / version 2 read Y = 0 / version 1 (INVARIANT FAIL!) write shadow Z (MUST NOT HAPPEN) lock Z check X version — pass (and no lock) check Y version — pass (BUT HAS A WRITE LOCK) copy shadow Z (HORROR!) done
A: copy shadow Y done

Future Work

We have omitted an attempt to allow queries within DTs. This is made particularly difficult due to the fact that native GAE queries do not honor even Local Transactions and further the allowed native GAE query predicates are rather limited compared to, say, those of SQL queries. One might imagine providing queries by explicitly providing within the DT infrastructure itself the locking necessary to achieve Isolation. In doing so, we could even provide queries of more complexity than native GAE queries. However, note that the difficulty of providing Isolation for such queries depends very much on the richness of the query language.

We rely on "strong (distributed-) consistency" in the underlying layer; an attempt could be made to design an algorithm that only needs a weaker level of distributed-consistency. Requiring only say causal distributed-consistency [cause-con] would make the algorithm far more general.

We have not investigated what further optimizations might be possible using deeper integration with the lower level APIs of App Engine and Big Table. For example, perhaps a shadow object could simply "replace" a user object rather than having its state be "copied".

We have not performed any performance evaluation.

Acknowledgments

Thanks to Russell Sears, Karl Chen, Ryan Barrett, Erick Armbrust, Robert Johnson, and Alfred Fuller for discussions leading to this design.

Thanks also to Ryan for not only reading the entire article, but also

Thanks also to Erick for not only reading the whole article, but also

Thanks also to Rob for

Thanks also to Alfred for

References

Personal

[Armbrust]: Personal communication from Erick Armbrust of Google.

[Barrett]: Personal communication from Ryan Barrett of the Google App Engine team.

[McPeak]: Personal communication from Scott McPeak.

Print

[Gray-Reuter] Jim Gray and Andreas Reuter. 1993. Transaction Processing: Concepts and Techniques. Morgan Kaufmann.

[O'Neil-O'Neil] Patrick O'Neil and Elizabeth O'Neil. 2001. Database: Principles, Programming and Performance. Morgan Kaufmann.

Project Home-pages

[BigTable] Big Table: http://labs.google.com/papers/bigtable.html

[GAE] Google App Engine: http://code.google.com/appengine/docs/

Web Articles, Documentation, Talks, and Discussions

[DatastoreIntro]: Introducing the GAE Datastore: http://code.google.com/appengine/docs/python/datastore/overview.html#Introducing_the_Datastore

[DatastoreTalk] Under the covers of the GAE Datastore: http://snarfed.org/space/datastore_talk.html

[ExceptionsGAE] GAE Exceptions: http://code.google.com/appengine/docs/python/datastore/exceptions.html

[IDsOfDel] "are numeric ids of deleted entities reused?": http://groups.google.com/group/google-appengine/browse_thread/thread/dec83c2dbd9542e4

[KeyClass] The Key Class: http://code.google.com/appengine/docs/python/datastore/keyclass.html

[KeyMethodOfModel] The key() method of Model: http://code.google.com/appengine/docs/python/datastore/modelclass.html#Model_key

[Keys] Keys and Entity Groups: http://code.google.com/appengine/docs/python/datastore/keysandentitygroups.html

[OfflineProcessingTalk] Offline Processing on App Engine: a Look Ahead: http://code.google.com/events/io/sessions/OfflineProcessingAppEngine.html

[RequestTimer] The Request Timer: http://code.google.com/appengine/docs/python/runtime.html#The_Request_Timer

[TransGAE] GAE Transactions: http://code.google.com/appengine/docs/python/datastore/transactions.html

[TransIsolGAE] Transaction Isolation in App Engine: http://code.google.com/appengine/articles/transaction_isolation.html

Wikipedia

[cause-con] Causal consistency: http://en.wikipedia.org/wiki/Causal_consistency

[dining-phil] Dining philosopher's problem and solutions: http://en.wikipedia.org/wiki/Dining_philosophers_problem#Resource_hierarchy_solution

[livelock] Livelock: http://en.wikipedia.org/wiki/Livelock#Livelock

[opt-cur] Optimistic concurrency: http://en.wikipedia.org/wiki/Optimistic_concurrency_control

[seq-con] Sequential consistency: http://en.wikipedia.org/wiki/Sequential_consistency

Copyright 2008-2009 Daniel Shawcross Wilkerson and Simon Fredrick Vicente Goldsmith.