Efficient Use of Asynchronous Operations in Google App Engine

Today's guest post comes from Jason Vertrees, Ph.D., CTO of RealMassive, and David Lippa, Principal Software Engineer. RealMassive provides real-time commercial real estate information and is powered by Google App Engine.

Multiprocessing and multithreading are a mainstay of modern applications. Google App Engine provides two primary means of parallel execution, the tasklet and deferred task. In this post we’ll go deep into both technologies and examine when to use tasklets or deferred tasks for asynchronous operations.

A tasklet is a function that is executed asynchronously in the application’s event loop until a yield statement is reached, which instructs the scheduler to move to the next call. A common use case here is to asynchronously fetch an entity from the datastore while performing additional tasks, and only blocking when that entity is needed. Tasklets also operate in a standard fashion within the context of a transaction, and can be decorated as such: operate non-transactionally, continue the existing transaction, or start a new one. They also execute during the HTTP request that spawned them, which can cause the request’s 60 second deadline to be exceeded. They are much more like a Thread executing within the context of a traditional ThreadPool, and must be managed similarly.

A deferred task is a "fire-and-forget," long-running idempotent operation whose return value is ignored, and can execute long after the HTTP request that spawned it. RealMassive often uses deferred tasks for sending out email digests, which must first be split into batches. In this respect, a deferred task is more similar to a Mapper or Reducer from Mapreduce than a Thread. Tasks are added to a push or pull "task queue" and run based on the rules governing that queue1[citation]. Since deferred tasks can execute for an extended time — often 10 minutes or more — they must behave very differently when decorated as transactional. Only 5 transactional tasks may be added within a given transaction, and they are executed before the transaction ends, and not necessarily immediately.

What’s a developer to do when a long-running operation must execute transactionally? Let’s take a look at a real example that RealMassive faced using a simplified, normalized version of our data model; something straightforward, without being overly simplified.

First, a few notes:
  • All entities are tagged with a NDB key to the User who performed an operation on that entity
  • A Space represents a suite in a Building. It’s located on a floor of the Building, and has a size in square feet. It may also have attachments, like images and videos
  • A Building represents a physical Building, with its own attributes (such as total square footage, owner, etc.) and a street address.
  • Spaces and Buildings have mutual references to one another.

from google.appengine.ext import ndb, deferred

class User(ndb.Model):
  username = ndb.StringProperty()
  email = ndb.StringProperty()

class BaseModel(ndb.Model):
  created_by = ndb.KeyProperty(User)
  edited_by = ndb.KeyProperty(User, repeated=True)

class Media(BaseModel):
  url = ndb.StringProperty()
  edited_by = ndb.KeyProperty(User, repeated=True)

class Address(ndb.StructuredProperty):
  street = ndb.StringProperty()
  city = ndb.StringProperty()
  state = ndb.StringProperty()
  zipcode = ndb.StringProperty()

class Building(BaseModel):
  address = ndb.StructuredProperty(Address)
  size = ndb.IntegerProperty()
  # prevent cyclical dependencies
  space_keys = ndb.KeyProperty(kind=’Space’, repeated=True)
  owner = ndb.KeyProperty(User)

    def remove(self):
       for s in space_keys:
         s.get().remove()
       self.key.delete()

class Space(BaseModel):
  building = ndb.KeyProperty(Building)
  sf_available = ndb.IntegerProperty()
  floor_number = ndb.IntegerProperty()
  suite_number = ndb.IntegerProperty()
  attachments = ndb.KeyProperty(Media, repeated=True)

    def remove(self):
        for a in attachments:
           a.delete()


So back to the original question: how do we handle a long-running task that must be executed transactionally — such as deleting a Building from NDB with this data model? As is the case with NoSQL, cascading deletes must be made programmatically in a transaction, descending to subsequent levels of the dependency tree as appropriate. There are a few flaws in our current implementation:

  • For a Building with many Spaces, the removal process could cause a DeadlineExceededError to occur, since the processes are performed in the same thread that is executing the HTTP request.
  • The removals are not transactional: if a single operation fails, there can be orphans.
  • Media can be orphaned as well, since there’s no reference counting with the Media

From the looks of these flaws, the correct answer may be just to execute the deletions in a deferred task, and decorate the remove() calls with ndb.transactional. Let’s see what that would look like:

class Building(BaseModel):
  address = ndb.StructuredProperty(Address)
  size = ndb.IntegerProperty()
  # prevent cyclical dependencies
  space_keys = ndb.KeyProperty(kind=’Space’, repeated=True)
  owner = ndb.KeyProperty(User)

  @ndb.transactional(xg=True)
  def remove(self):
      for s in space_keys:
         deferred.defer(s.get().remove)
      self.key.delete()

class Space(BaseModel):
   building = ndb.KeyProperty(Building)
   sf_available = ndb.IntegerProperty()
   floor_number = ndb.IntegerProperty()
   suite_number = ndb.IntegerProperty()
   attachments = ndb.KeyProperty(Media, repeated=True)

   @ndb.transactional(xg=True)
   def remove(self):
     for a in attachments:
        deferred.defer(a.delete)


There are a number of problems with this approach:

  • If a queue supports simultaneous operations, there’s no way to guarantee deletion order unless the queue rate is set to 1 per second.
  • Building.remove() could exceed deadlines since it calls get() before delegating to Space.remove().
  • The semantics of this code is very confusing, since it appears that everything is done within the same transaction.
  • It won’t scale. If a Building has more than 5 Spaces, this could exceed the limit on the number of deferred tasks within a transaction. Also, cross-group transactions are limited to 30 seconds of clock time.

The answer lies in using tasklets for most operations, and allowing the caller to determine if the operation should be deferred or not:

class Building(BaseModel):
  address = ndb.StructuredProperty(Address)
  size = ndb.IntegerProperty()
  # prevent cyclical dependencies
  space_keys = ndb.KeyProperty(kind=’Space’, repeated=True)
  owner = ndb.KeyProperty(User)

  @ndb.transactional(xg=True)
  @ndb.tasklet

  def remove(self):
     futures = []
     for s in space_keys:
        sp = yield s.get_async()
        futures.extend(sp.remove_async())

     ndb.wait_all(futures)
     self.key.delete()

class Space(BaseModel):
  building = ndb.KeyProperty(Building)
  sf_available = ndb.IntegerProperty()
  floor_number = ndb.IntegerProperty()
  suite_number = ndb.IntegerProperty()
  attachments = ndb.KeyProperty(Media, repeated=True)

  @ndb.transactional(xg=True)
  def remove_async(self):
     return ndb.delete_multi_async(self.attachments)

  def remove(self):
     ndb.Future.wait_all(self.remove_async())


Both tasklets and deferred tasks provide an abstraction that simplifies parallel processing in App Engine. If you’re working in transactions, or after the lifetime of an HTTP request, it’s important to choose the right technology for your asynchronous operation.


1 We are not going to dive into the differences or details of task queues. For more information, please see the citation.