Continuations and Web Servers: A Reaction

Or, you can never negate the same stream twice.

1 Abstract

Continuations and Web Servers is a really fun paper about the design and implementation of long-lived stateful web services in terms of continuations. While the paper is brilliant it has a few qualities which blunt its impact:

  • its example code alludes to a more comprehensive web application framework, which I was not able to set up and tinker with;
  • setting that aside, it is written in Scheme and defers a lot of theory and implementation details to that language’s continuation mechanism. For many, this adds the cognitive load of learning Scheme first.Don’t get it twisted: I love Scheme. But this choice of implementation probably deters people from reading who want more immediate value and still leaves continuations a kind of mysterious ether to be handled with care by Experts.

This has an effect on the reader like that of reading a treasure map: it is a thrilling call to adventure which promises great fortune but is, ultimately, just a fantastic story we set down when we go to work.

We will instead cover the same ideas but in TypeScript, a language in which this author and many other people are paid to write on a daily basis. We will also wrestle continuations out of the shadows and into the harsh Light of Day and implement them ourselves, using ideas from RxJS of all places. Finally we will actually write a small, stateful web service and show that this whole circus was worth it.

And it actually runs and you can use it.

Total amount of code is right at 300 lines, including pretentious formatting. The ideas are big but the implementations are mercifully concise.

1.1 The example app

Actually we will begin with our example application. A session is begun by POSTing a payload { "n": <n> } to /sum. The reply, and each subsequent reply until the last one, will be the sub-total along with a link to send the next number in that session. The number must be positive; if 0 (or less) is received the session is terminated.

Just read the code, the whole point is it’s supposed to be obvious:

class ExampleApp extends Handler {
  protected _sum = 0;

  async handle(request: RequestData) {
    let done = false;
    while (!done) {
      try {
        const { n } = request.parse<{ n: number }>();
        if (n <= 0) {
          done = true;
          request.replyOk({
            total: this._sum,
          });
        } else {
          this._sum += n;
          request = await this.suspend(
            (resumeId) =>
              void request.replyOk({
                subtotal: this._sum,
                resumeAt: `/_r/${resumeId}`,
              })
          );
        }
      } catch (err) {
        request = await this.suspend(
          (resumeId) =>
            void request.replyError(500, {
              error: JSON.stringify(err),
              subtotal: this._sum,
              resumeAt: `/_r/${resumeId}`,
            })
        );
      }
    }
  }
}

void httpRequests(8000).subscribe(
  router({
    "/sum": ExampleApp
  })
);
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":3}' 'http://localhost:8000/sum'
{"subtotal":3,"resumeAt":"/_r/s-1"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":2}' 'http://localhost:8000/_r/s-1'
{"subtotal":5,"resumeAt":"/_r/s-2"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":1}' 'http://localhost:8000/_r/s-1'
{"error":"No continuation for s-1."}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":1}' 'http://localhost:8000/_r/s-2'
{"subtotal":6,"resumeAt":"/_r/s-3"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":0}' 'http://localhost:8000/_r/s-3'
{"total":6}

This code has some desirable properties:

  1. The majority of the code deals directly with the problem domain.
  2. The algorithm is expressed in a straightforward, “direct” style like that of a one-off script, even though it spans multiple asynchronous requests. Legibility is the key to maintaining software with grace.
  3. Behold: no RxJS or weird continuation stuff. Our efforts result in less homework for people just trying to get things done, not more.

suspend is our raison d’être. When called it will suspend the handler execution, register it somewhere with a unique identifier, and pass that identifier to a callback to be invoked before exiting. Thus the handler may suspend itself and reply with the information necessary to be resumed.

2 Continuations from streams

2.1 Will somebody please tell me what continuations are

The original paper begins assuming the reader already has some inkling of what continuations are.

The term continuation is used to refer to the state of the executing program and future work to be done.

Lisp languages, like Scheme, have support for assigning the “current continuation” to a variable that you can invoke as a function. It’s called call-with-current-continuation (or the shorter alternative call/cc). In some senses it’s the most fundamental control flow mechanism: the ability to pick up your current universe or timeline and store it to resume later.

We are going to build our own call/cc using a homemade clone of RxJS. This is for two reasons:

2.1.1 To give some concrete, no-bullshit meaning to continuation jargon

You don’t have to take my word for it but, anecdotally, a lot of material about continuations is written by academic types who are really excited about a code pattern they discovered but have not typically used outside of some carefully written samples in languages primarily used to write code samples, which offer little for blue color morons like me to grip and hold and look at and so on.

2.1.2 To reveal the actual mechanism beneath reactivity and threading

We could just use RxJS but that would shuffle the most critical details, yet again, to a magical black box that most readers will not have the time to review for the handful of crucial definitions buried within.

Instead, because the final product is only ~300 lines of code and because it eliminates an external dependency, we will write our own.

All this is to say, by achieving the aims of the original paper using a reactive streaming toolkit I hope to present a Rosetta stone of sorts between two active fields that don’t seem to be cross-pollinating as much as they should.

2.2 Double Negation and Inverting Control

An Observable inverts control. In the most basic setting, a function A -> B is simply applied to a suitable argument A to produce a concluding B. An Observable, instead, is an A-like thing which consumes functions A -> B as its arguments with a method called subscribe. The ultimate goal is the same: to annihilate an A with some A -> B.

The following is a stripped down imitation of the fundamental RxJS type which achieves this.

interface Observer<A, B = void> {
  next(value?: A): B;
}

class Observable<A> {
  constructor(
    private readonly _action: (c: Observer<A, unknown>) => (() => void) | void
  ) {}

  public subscribe<B = void>(
    observer: Observer<A, B> | ((a: A) => B)
  ): () => void {
    const unsubscribe =
      "function" === typeof observer
        ? this._action({
            next(v: A): B {
              return observer(v);
            },
          })
        : this._action(observer);
    return "function" === typeof unsubscribe
      ? unsubscribe
      : () => {
          return;
        };
  }
}

Borrowing a term from intuitionistic logic, for a given value A its negation is an implication which assumes A and concludes false (it literally negates A).Programming is an application of intuitionistic logic or I wouldn’t mention it.

A continuation is a negation in this sense: it represents future work to be performed, assuming some initial value. And since false implies true, a continuation can produce any result as long as it “doesn’t matter.”Consider how libraries like axios or fetch model remote network requests with callbacks (which return nothing).

Any function-like thing can be considered a negation in this way of course. RxJS has a special type, Observer<A>, which represents work to be done when its next method is called with a suitable argument of type A.

An Observable<A>, being a thing which negates things-which-negate-A, is a double negation.

2.2.1 A quick detour

We will quickly elaborate on these types to build a simple reactive computation á la RxJS. Reactivity is a technique, not a library or feature you have to wait for a language to support. (Also the prune function is useful beyond this section, as we will see.)

interface Operator<A, B> {
  (s: Observable<A>): Observable<B>;
}

function keep<A>(promise: PromiseLike<A>): Observable<A> {
  let canceled = false;
  return new Observable((observer) => {
    void promise.then((value) => {
      if (!canceled) {
        setTimeout(() => void observer.next(value), 0);
      }
    });
    return () => {
      canceled = true;
    };
  });
}

function merge<A>(it: Iterable<Observable<A>>): Observable<A> {
  let unsubbers: (() => void)[] = [];
  return new Observable((observer) => {
    for (const upstream of it) {
      setTimeout(() => {
        const unsubscribe = upstream.subscribe(observer);
        unsubbers.push(unsubscribe);
      }, 0);
    }
    return () => {
      unsubbers.slice().forEach((unsubscribe) => void unsubscribe());
    };
  });
}

// Hang on to this one. We use it later to define `reset`.
function prune<A>(): Operator<A, A> {
  return (source) =>
    new Observable((observer) => {
      const unsubscribe = source.subscribe((value: A): unknown => {
        setTimeout(() => void unsubscribe(), 0);
        return observer.next(value);
      });
      return unsubscribe;
    });
}

const stream1 = merge([
  keep(
    new Promise((resolve) => void setTimeout(() => void resolve("C"), 100))
  ),
  keep(
    new Promise((resolve) => void setTimeout(() => void resolve("A"), 30))
  ),
  keep(
    new Promise((resolve) => void setTimeout(() => void resolve("B"), 50))
  ),
]);
const stream2 = prune()(stream1);
void stream2.subscribe((response) => {
  console.log("Received response:", response);
});
// prints "Received response: A"

This contrived example fires off 3 operations which Promise to resolve at various times in the future, merges their result streams into one, and finally “prunes” the output to only the first result, discarding the latter two.

2.3 Continuations in reactive terms

This is where the link between reactivity and continuations first became apparent to me. With a new function, reset, we can subscribe a Promise callback to an observable computation.

function reset<A>(observable: Observable<A>): Promise<A> {
  const pruned: Observable<A> = prune()(observable);
  return new Promise<A>((resolve) => void pruned.subscribe(resolve));
}

async function resetTest() {
  const result = await reset(stream1);
  console.log("Received response:", result);
}
resetTest();
// This ALSO prints "Received response: A".

The function resetTest is syntactic sugar for the following:

function resetTest_keto() {
  //                  +-- callback
  //                  V
  reset(stream1).then((result) => {
    console.log("Received response:", result);
  });
}

reset effectively subscribes the callback function noted above to an Observable; when combined with async/await syntax sugar what we end up with is, if you really use your imagination, a kind of call/cc for async functions.

Another example will help drive this point home. This next function uses two Observables in an unusual way to achieve non-linear control flow and (promise to) return the number 16.

async function sixteen(): Promise<number> {
  return reset(
    new Observable((kOuter) =>
      void new Observable((kInner) => {
        const eight = kInner.next(4);
        kOuter.next(kInner.next(eight) as number);
      }).subscribe((n) => {
        return (n as number) * 2;
      })
    )
  ) as number;
}
// My types could be improved - in the name of flexibility I am clearly
// throwing away critical and obvious information.

The inner Observable uses its continuation kInner twice, which is bound to an Observer which doubles its argument and returns it. Doubling 4 twice of course yields 16 which is then handed to the outer continuation kOuter.

Here is an important point: kOuter is bound to the resolve callback of a Promise and cannot be invoked more than once. Doing so will result in a runtime error. However, kInner may be used 0-or-more times as it is not bound to a Promise.

Observables may be used to implement complex control flow in ways Promises cannot due to their one-use restriction.

3 The web server library

Equipped with a concrete continuation mechanism we can go about implementing the example from the original paper.

The paper’s example application is a form accepting numbers as input which displays a running total. We will expose a JSON API instead, taking advantage of TypeScript’s built-in support for parsing and serializing JSON to minimize the presentation layer.

3.1 Suspensions and registries

A handful of definitions will clarify the meaning of Handler and its suspend method.

interface Suspension {
  (u: RequestData): unknown;
}

When a service Handler suspends execution its current continuation is registered somewhere with a unique ID so it can be resumed later. We model these suspensions as unary functions accepting RequestData arguments. This makes sense: execution will be resumed by a new request, after all.

type Identifier = string;

interface Registry {
  register(suspension: Suspension): Identifier;
  lookup(route: Identifier): Suspension | null;
  remove(route: Identifier): void;
}

We keep things simple and use strings as our unique identifiers. The interface for our suspension Registry is likewise straightforward: one may register, lookup, or remove suspended executions.

3.2 Handler

We have everything we need now to handle requests which may suspend themselves for later resumption by a new request.

Instead of having an ambient Registry in the global state like the original paper, we opt to inject it as a dependency to each Handler.

// eg, RouteParams<"foo"|"bar"> => { "foo": string, "bar": string }
type RouteParams<TParams extends string> = { [K in TParams]: string };

abstract class Handler<TParams extends string = never> {
  constructor(protected readonly suspensions: Registry) {}

  public abstract handle(
    request: RequestData,
    params: RouteParams<TParams>
  ): Promise<void>;

  protected suspend(fn: (id: Identifier) => void): Promise<RequestData> {
    return reset(
      new Observable<RequestData>((resolve) => {
        const resumeId = this.suspensions.register((x) => resolve.next(x));
        void fn(resumeId);
        return () => void this.suspensions.remove(resumeId);
      })
    );
  }
}

The original paper defines a function show which generates a unique suspension identifier on every request, whether it is needed or not. This code realizes show as two pieces: the handle method which initiates server sessions, and the suspend method which registers the current continuation on-demand. The end result simply means you don’t have to capture the suspension ID at the start of every service handler (but you can).

Regardless, the heart of the original essay and this paper is captured in the suspend method above.

3.2.1 Side show: route parameters

Our example does not make explicit use of it, but our library allows users to specify route parameters using a familiar syntax; the values will be extracted and passed to handle as a second argument.

Eg, if you define a route /p/:foo/:bar and receive a request for p/A/2 then an object { "foo": "A", "bar": "2" } will be passed to handle as its second argument. If none are specified this argument can (and probably should) be ignored. We use it internally and the code is smaller than I would have expected so it is included in the appendix.

3.3 Observing incoming requests

Of course, we may also use Observables the way they’re advertised: to process a stream, this one of HTTP requests. Node’s built-in http library exports a simple callback-based API for writing HTTP services, and inverting callbacks is RxJS’ bread and butter. The following code constructs an Observable of RequestData objects, each of which represents an incoming request in need of a reply.

function httpRequests(port: number): Observable<RequestData> {
  return new Observable((process) => {
    const server = createServer((request, response) => {
      const [route, queryString] = (request.url ?? "").split(/\?/);
      let data = "";
      request.on("data", (payload) => {
        data += payload;
      });
      request.on(
        "end",
        () =>
          void process.next(
            new RequestData(route, response, data, queryString ?? "")
          )
      );
    });
    server.listen(port, () => void console.log(`Listening on port ${port}`));
    return () => void server.close();
  });
}

3.4 Connecting everything with the router

For convenience here is the last part of the example application again:

void httpRequests(8000).subscribe(
  router({
    "/sum": ExampleApp
  })
);

The Observer we subscribe to our httpRequests Observable is the return value of the enigmatic router function. We’ll shootdefine it first and ask questions later.

type Routes = Record<Route, new (r: Registry) => Handler>;

function router(routes: Routes) {
  routes["_r/:id"] = class extends Handler<"id"> {
    async handle(request: RequestData, params: RouteParams<"id">) {
      const resume = this.suspensions.lookup(params.id);
      void (resume
        ? resume(request)
        : request.replyError(404, {
            error: `No continuation for ${params.id}.`
          }));
    }
  };
  return {
    /* This method defines an Observer. */
    async next(request: RequestData) {
      let handler, template;
      for (const [_template, _handler] of Object.entries(routes)) {
        if (matchesPath(_template, request.route)) {
          template = _template;
          handler = _handler;
          break;
        }
      }
      if (!handler || !template) {
        request.replyError(404, {
          error: `No handler found for route ${request.route}`
        });
        return;
      }
      try {
        await new handler(this).handle(
          request,
          pathToObject(template, request.route)
        );
      } catch (err) {.
        request.replyError(500, {
          error: err.toString()
        });
      }
      return;
    },
    /* These properties and methods define a Registry. */
    _store: {},
    _unique: 0,
    register(suspension: Suspension): Identifier {
      const id = `s-${++this._unique}`;
      this._store[id] = suspension;
      return id;
    },
    lookup(route: Identifier): Suspension | null {
      return this._store[route] ?? null;
    },
    remove(route: Identifier): void {
      delete this._store[route];
    }
  };
}

type Router = ReturnType<typeof router>;

Let’s review this.

3.4.1 The resumption Handler

We first insert a special handler for resuming our suspended continuations. The author is pleased he was able to implement this feature using the library itself.

Requests to /_r/:id, where id is a unique string, will invoke the lookup method on Registry to fish out the Suspension and invoke it with the fresh RequestData.

If there is no suspended session a 404 is sent back to the client. Otherwise it is invoked with its argument and original execution will resume.

3.4.2 The RequestData Observer

The next method is what makes the Router object an Observer of RequestData objects. It begins by looking up a handler which matches the requested route. If found the Handler is instantiated; if not, again a 404 is sent sent as the reply.

The helper functions which marshal parameters out of the route template are included at the end in the Appendix.

3.4.3 The Registry

In addition to being an Observer, the Router is also our Registry, as evidenced by it injecting itself into each Handler it instantiates.

Obviously there are reasons to split this Router object up into multiple separate types with separate concerns. The interface guarantees that client code will be called out by the type checker if it tries to do anything besides lookup, register, and remove suspensions, but this really only helps you prevent mistakes in your code; at runtime other people’s code does not have to ask tsc for permission to do anything, of course.

Fortunately this design makes it very easy to modify the route function without needing to update existing applications which use the library, or major overhauls to the rest of the library (not that “the rest” comprises very much).

4 Conclusion

This essay is not meant to be a substitute for the original paper. The author does not have the education, skill, or disposable time to accomplish such. However, by achieving what it does in such a small amount of code which can be run directly with ts-node, I hope I have demonstrated both a powerful technique for structuring web services as well as a novel correspondence between two subjects which are not often discussed together: reactivity and continuations.

5 Appendix: RequestData and route parameters

All of the code above implies a RequestData object which provides structured access to the incoming request data as well as a means of replying. It is not terribly surprising and is presented here for completeness and to show there is nothing up my proverbial sleeves. Additionally I have provided the remaining helpers used by the above code to parse request routes. It’s clever and worth looking at but not the focus of this essay.

type Route = string;

class RequestData {
  public readonly queryParameters: Record<string, string>;
  constructor(
    public readonly route: Route,
    public readonly response: ServerResponse,
    public readonly data: string,
    queryString: string
  ) {
    this.queryParameters = Object.fromEntries(
      queryString.split(/&/).map((x) => x.split(/=/))
    );
  }

  public writeResponse(
    statusCode: number,
    payload: string,
    contentType = "application/json"
  ) {
    this.response.writeHead(statusCode, { "Content-Type": contentType });
    this.response.end(payload);
  }

  protected serializeResponse(message?: unknown): string {
    return "undefined" === typeof message
      ? ""
      : "string" === typeof message
      ? message
      : JSON.stringify(message);
  }

  public replyOk(message?: unknown): void {
    const payload = this.serializeResponse(message);
    return this.writeResponse(message ? 200 : 204, payload);
  }

  public replyError(statusCode: number, message?: unknown): void {
    const payload = this.serializeResponse(message);
    return this.writeResponse(statusCode, payload);
  }

  public parse<T extends object>(): T {
    return JSON.parse(this.data) as T;
  }
}

const captureVariables = /:(\w*)/gi;

function templateToRegExp(template: string) {
  return new RegExp(
    template.replace(
      captureVariables,
      (_, variable) => `(?<${variable}>[^/]*)`
    ) + "$",
    "gi"
  );
}

function matchesPath(template: string, pathname: string) {
  return templateToRegExp(template).test(pathname);
}

function pathToObject(template: string, pathname: string) {
  return (
    templateToRegExp(template).exec(decodeURIComponent(pathname))?.groups ?? {}
  );
}