By Ugorji Nwoke   14 Nov 2011   /blog   appengine geek golang technology

Enable Concurrent Requests in Go App Engine SDK

This details how to enable concurrent requests in the Go App Engine SDK.

UPDATES:
Nov 15:
Added that python sdk is currently not threadsafe. This shows how to make GO side threadsafe, and still test concurrency in your application (even though only 1 API request is processed at a time).

Background

The GO App Engine SDK has a pretty elegant design which I wished the Java App Engine SDK had. Full SDK with support for app engine services is supported one time (via Python), and new language runtime (like Go) can be introduced quickly, leveraging that investment (as opposed to duplicating it). Brilliant.

It also simulates what happens in production to an extent, where there’s an App Engine instance that runs your application, but uses RPC (remote procedure calls) to access services provided by App Engine.

In this setup, the Python SDK which supports all the App Engine Runtime services acts as two things:

  • A front end.
    Non-app requests are handled by the Python SDK front-end, and app requests get proxied over to the Go Application Instance.
  • RPC Server.
    All App Engine services reside on the Python SDK. The Go Application uses RPC to access those services.

Getting everything to work is pretty neat.

  • The Python Dev Server creates a Go Instance as a child process
  • The Go Instance creates a Server Socket which the Python SDK uses to proxy http requests to it
  • The Python Dev Server creates a Server Socket which the Go Instance uses to send API requests to it
  • Only one request happens at a time, as detailed below.

When a request comes through the Python SDK for the Go App, the following happens:

  • The Python SDK creates a socket to Go Instance and sends the http request to it
  • The Go Instance handles the request.
    For any API calls, it makes a socket connection to the Python SDK API, and sends the request and receives the API response back.
  • The Go Instance sends the response back to the Python SDK
  • The Python SDK forwards the response to the client

Currently, the design has some limitations that allow only one request be handled at a time:

  • This implementation uses CGI
  • Handling socket communication only occurs within the context of a request i.e. the sockets are not listened to unless a single request is in process

Objective:

The objective here is to support concurrent request. This can be done by making the Python SDK a full proxy, with standalone support as an API RPC Server (outside the context of a request).

This will allow more involved testing scenarios:

  • Have tests running directly on the GO Server within the regular context of a request (including common work done before and after a handler is called)
  • Have tests using the Python SDK server directly for API calls

To summarize, these are the things we hope to achieve:

  • Let the Python SDK be a true and full proxy to Go Instance, allowing concurrent requests be proxied and handled.
  • Honor allow_skipped_files flag (to allow skipped files e.g. test files, etc)
    Allowing skipped files in development is very necessary for tests, pre-building, etc.
  • Support testing framework, which can access the Python SDK as an API server without going through a request.
    This way, testing can involve just starting a Python Dev Server (even if no http request happens).

To achieve this, the following changes are necessary:

  • Use the Python 2.7 SDK which allows for concurrent requests
  • Use WSGI (as opposed to CGI) which allows for concurrent requests
  • Have API socket listening and handling be always-on (not only when a http request is in process).
    Use a thread to listen to and respond to all API socket communication (listening and handling)
  • Have a setup/init function that is run when the Python SDK is started for a GO Runtime, as opposed to a one-time run when a http request happens

This support is got by minor edits to 2 files, a more involved edit to 1 file, and an one-line change in your app.yaml:

  1. google/appengine/ext/go/init.py
  2. google/appengine/tools/dev_appserver_main.py (minor edit)
  3. google/appengine/tools/dev_appserver.py (minor edit)
  4. app.yaml (to reference the WSGI app instead of _go_app)

I’ve shared a folder containing all of the changed files online here. Feel free to download the changed files and follow through. For all changes, look for the name “ugorji” in a comment in the file before each change.

But Python SDK does NOT support concurrent requests

Yes, Even with these changes, requests to the Python SDK are still inherently single threaded:

  • dev_appserver…serve_forever() will handle one request at a time
  • dev_appserver is not thread safe. In the midst of multiple threads handling requests, it get datastore collisions and barfs

Thus, these changes will make the GO side run concurrently. A user can add a back-door http listen port and access the GO instance directly. I do this within an init() or sync.Once.Do(…) surrounded by an if appengine.IsDevAppServer() { … }

http.HandleFunc("/", ...) 
http.ListenAndServe(":9999", nil)

Also, within your top-level request handling code, do a check to ensure the header for contexts is set. This is necessary because the Python SDK will add this to the headers proxied to your application. Bypassing the python proxy requires that at a minimum, you set this yourself (before creating any appengine.Context).

if r.Header.Get("X-AppEngine-Inbound-AppId") == "" {
    r.Header.Set("X-AppEngine-Inbound-AppId", "dev~app")
}

After that, you can make requests at http://localhost:9999 and get access to your application. Requests through this url can run concurrently. Access to API’s will still be serial (one at a time) but you can still test concurrency in general for your application. This way, only API requests block but the everything else runs concurrently.

When Python SDK becomes thread safe, we only need to make a few changes to be compliant.

  1. On Go AppEngine end, update the following:
    1. appengine_internal.InitAPI:
      just store the network address for the API server
    2. appengine_internal.call:
      open/close a connection to API server for each request
  2. On Python SDK extension:
    1. ext/go/init.py:
      change DelegateServer to listen(n) where n is number of concurrent requests supported e.g. listen(10)

Tangent: Testing Support

This is a tangent … more details here

Gotchas

For some reason, excessive logging calls, especially when logging things which don’t fit nicely in ascii, break the dev server. That’s why all the logging calls I added are commented out.

Walkthrough of Changes:

As mentioned earlier, we need to make minor edits to 2 files, a more involved edit to 1 file, and an one-line change in your app.yaml:

  1. google/appengine/ext/go/init.py
  2. google/appengine/tools/dev_appserver_main.py (minor edit)
  3. google/appengine/tools/dev_appserver.py (minor edit)
  4. app.yaml (to reference the WSGI app instead of _go_app)

I’ve shared a folder containing all of the changed files online here. Feel free to download the changed files and follow through. For all changes, look for the name “ugorji” in a comment in the file before each change.

For each change, we:

  • highlight the file in bold
  • show description of the changes in italics
  • then the change after that

google/appengine/tools/dev_appserver_main.py:

# Before call to http_server.serve_forever() top-level try block:
# Call go_init(...) to initialize everything
  if appinfo.runtime == 'go':
    from google.appengine.ext.go import go_init
    go_init(port, root_path, allow_skipped_files)

google/appengine/tools/dev_appserver.py:

# Ensure go uses py27 handler
# In def ExecuteCGI:
      # if handler_path and config and config.runtime == 'python27':
      if handler_path and config and (config.runtime == 'go' or config.runtime == 'python27'):
        reset_modules = exec_py27_handler(config, handler_path, cgi_path, hook)

# To remove vestiges of old CGI model (ie execute_go_cgi)
# Remove "if handler_path == '_go_app':" block
  # if handler_path == '_go_app':
  #   from google.appengine.ext.go import execute_go_cgi
  #   return execute_go_cgi(root_path, handler_path, cgi_path,
  #       env, infile, outfile)

google/appengine/ext/go/init.py:

# Add to end of imports
import threading
import StringIO


# At top, after imports, define variable
# It is set later on by the go_init call (based on --allow_skipped_files flag to python sdk)
ALLOW_SKIPPED_FILES = False

# Define function go_safe_make_and_run
# This uses a lock to ensure that make_and_run is not called in parallel

_make_and_run_lock=threading.Lock()
def go_safe_make_and_run():
  global GO_APP, _make_and_run_lock
  _make_and_run_lock.acquire()
  try:
    GO_APP.make_and_run()
  finally:
    _make_and_run_lock.release()


# At bottom: define new function: go_init(...)
# It takes from one-time in execute_go_cgi, and makes it a function called by dev_server.py
# Also uses a single daemon thread to asyn manage all API socket communication
def go_init(port, root_path, allow_skipped_files):
  global RAPI_HANDLER, SOCKET_API, SOCKET_HTTP, GAB_WORK_DIR, GO_APP, ALLOW_SKIPPED_FILES
  if not RAPI_HANDLER:
    user_port = '%s_%s' % (getpass.getuser(), port)
    SOCKET_API = SOCKET_API % user_port
    SOCKET_HTTP = SOCKET_HTTP % user_port
    GAB_WORK_DIR = gab_work_dir() % user_port
    cleanup()
    atexit.register(cleanup)
    RAPI_HANDLER = handler.ApiCallHandler()
    GO_APP = GoApp(root_path)
    ALLOW_SKIPPED_FILES = allow_skipped_files
    logging.info("Calling Delegate Server Now")
    ds = DelegateServer()
    logging.info("Socket server: %s: %s", SOCKET_API, os.stat(SOCKET_API))
    def apiConnLoop():
      while ds.connected or ds.accepting:
        asyncore.loop(map=ds._map, count=1)
    th = threading.Thread(target=asynCoreLoop)
    th.setDaemon(True)
    th.start()

# To remove vestiges of old CGI model (ie execute_go_cgi)
# Completely remove execute_go_cgi method

# *** If you don't remove execute_go_cgi method, ...
# In execute_go_cgi(root_path, handler_path, cgi_path, env, infile, outfile):
# Since setup was moved to a global function, comment out "if not RAPI_HANDLER: "block
  #   global RAPI_HANDLER, GAB_WORK_DIR, SOCKET_HTTP, SOCKET_API, GO_APP
  #   if not RAPI_HANDLER:
  #     user_port = '%s_%s' % (getpass.getuser(), env['SERVER_PORT'])
  #     GAB_WORK_DIR = gab_work_dir() % user_port
  #     SOCKET_HTTP = SOCKET_HTTP % user_port
  #     SOCKET_API = SOCKET_API % user_port
  #     atexit.register(cleanup)
  #     DelegateServer()
  #     RAPI_HANDLER = handler.ApiCallHandler()
  global HEADER_MAP
  go_safe_make_and_run()

# *** If you don't remove execute_go_cgi method, at least ...
# In execute_go_cgi, update call to asyncore.loop(...), to only use current map
# This way, it only loops over the socket for its request
    x = DelegateClient(http_req)
    while not x.closed:
      asyncore.loop(map=x._map, count=1)

# In RemoteAPIHandler(asyncore.dispatcher_with_send):
# Update constructor to take a map, so it can store its request map for use by asyncore.loop
class RemoteAPIHandler(asyncore.dispatcher_with_send):
  def __init__(self, sock, map=None):
    asyncore.dispatcher_with_send.__init__(self, sock, map=map)

# In DelegateServer.handle_accept
# Pass the DelegateServer map in to constructor of RemoteAPIHandler
# This way, we only have 1 thread that loops over all API Socket communication
    RemoteAPIHandler(sock, self._map)


# *** Do this only if python sdk is still not threadsafe ***
# In DelegateServer.__init__, it should only listen to 1 connection max
# This way, even with multiple connections, only 1 is served at a time
    self.listen(1)


# In find_app_files(basedir), add check for ALLOW_SKIPPED_FILES:
# This way, skip_files can be used in dev environment (if -allow_skipped_files flag given)
# This is necessary, to allow for testing, and using artifacts which should not make it to production
      if not ALLOW_SKIPPED_FILES and APP_CONFIG.skip_files.match(ename):
        continue

# support wsgi for python2.7, by defining your WSGI application as a single callable function.
# Note that it doesn't use webob. Just plain WSGI protocol.
def go_wsgi(environ, start_response):
  global HEADER_MAP
  go_safe_make_and_run()

  request_method = environ['REQUEST_METHOD']
  server_protocol = environ['SERVER_PROTOCOL']
  url = environ.get('SCRIPT_NAME','')
  url += environ.get('PATH_INFO','')
  if environ.get('QUERY_STRING'):
    url += '?' + environ['QUERY_STRING']
  
  body = ''
  length = 0
  if environ.get('CONTENT_LENGTH'):
    length = int(environ['CONTENT_LENGTH'])
    body = environ['wsgi.input'].read(length)
  
  headers = {}
  for k, v in environ.items():
    if k in HEADER_MAP:
      headers[HEADER_MAP[k]] = v
    elif k.startswith('HTTP_'):
      hk = k[5:].replace("_", "-")
      if hk.title() == 'Connection':
        continue
      headers[hk] = v
  headers['Content-Length'] = str(length)
  headers['Connection'] = 'close'
  #logging.info("headers: %s", headers)
  
  hrl = []
  hrl.append(request_method + ' ' + url + ' ' + server_protocol)
  for k, v in headers.items():
    hrl.append(k + ': ' + v)
  hrl.append("")
  hrl.append(body)
  http_req = '\r\n'.join(hrl)
  
  x = DelegateClient(http_req)
  #logging.info("x.closed: %s, map: %s", x.closed, x._map)
  while not x.closed:
    asyncore.loop(map=x._map, count=1)
  
  res = x.result
  if res.startswith('HTTP/1.0 ') or res.startswith('HTTP/1.1 '):
    fp = StringIO.StringIO(res)
    headerlist = []
    line1 = fp.readline().strip()
    while True:
      line = fp.readline().strip()
      if not line:
        break
      header_name, value = line.split(':', 1)
      headerlist.append((header_name, value.strip()))
    body = fp.read()
    start_response(line1[9:], headerlist)
    return [body]
  else:
    start_response("500 Internal Server Error", [('Content-Type', 'text/plain')])
    return ['Internal Server Error']

# Define top-level variable in the module that can be referred in app.yaml
# This one must handle exceptions and populate the result appropriately, so 
def GO_WSGI_APP(environ, start_response):
  x = None
  try:
    x = go_wsgi(environ, start_response)
  except dev_appserver.CompileError, ex:
    start_response("500 Internal Server Error", [('Content-Type', 'text/plain')])
    x = [ex.text]
  except Exception, ex:
    start_response("500 Internal Server Error", [('Content-Type', 'text/plain')])
    x = [str(ex)]
  return x

app.yaml

# use the WSGI app as your script, instead of _go_app
- url: /.*
  script: google.appengine.ext.go.GO_WSGI_APP
  # script: _go_app
Tags: appengine geek golang technology


Subscribe: Technology
© Ugorji Nwoke