Continuations and Web Servers: A Reaction
Or, you can never negate the same stream twice.
1 Abstract
Continuations and Web Servers is a really fun paper about the design and implementation of long-lived stateful web services in terms of continuations. While the paper is brilliant it has a few qualities which blunt its impact:
- its example code alludes to a more comprehensive web application framework, which I was not able to set up and tinker with;
- setting that aside, it is written in Scheme and defers a lot of
theory and implementation details to that language’s
continuation mechanism. For many, this adds the cognitive load of
learning Scheme first.Don’t get it twisted: I love Scheme.
But this choice of implementation probably deters people from reading
who want more immediate value and still leaves continuations a kind of
mysterious ether to be handled with care by Experts.
This has an effect on the reader like that of reading a treasure map: it is a thrilling call to adventure which promises great fortune but is, ultimately, just a fantastic story we set down when we go to work.
We will instead cover the same ideas but in TypeScript, a language in which this author and many other people are paid to write on a daily basis. We will also wrestle continuations out of the shadows and into the harsh Light of Day and implement them ourselves, using ideas from RxJS of all places. Finally we will actually write a small, stateful web service and show that this whole circus was worth it.
And it actually runs and you can use it.
Total amount of code is right at 300 lines, including pretentious formatting. The ideas are big but the implementations are mercifully concise.
1.1 The example app
Actually we will begin with our example application. A session is
begun by POSTing a payload { "n": <n> }
to
/sum
. The reply, and each subsequent reply until the last
one, will be the sub-total along with a link to send the next number in
that session. The number must be positive; if 0 (or less) is received
the session is terminated.
Just read the code, the whole point is it’s supposed to be obvious:
class ExampleApp extends Handler {
protected _sum = 0;
async handle(request: RequestData) {
let done = false;
while (!done) {
try {
const { n } = request.parse<{ n: number }>();
if (n <= 0) {
= true;
done .replyOk({
request: this._sum,
total;
})else {
} this._sum += n;
= await this.suspend(
request =>
(resumeId) void request.replyOk({
: this._sum,
subtotal: `/_r/${resumeId}`,
resumeAt
});
)
}catch (err) {
} = await this.suspend(
request =>
(resumeId) void request.replyError(500, {
: JSON.stringify(err),
error: this._sum,
subtotal: `/_r/${resumeId}`,
resumeAt
});
)
}
}
}
}
void httpRequests(8000).subscribe(
router({
"/sum": ExampleApp
}); )
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":3}' 'http://localhost:8000/sum'
{"subtotal":3,"resumeAt":"/_r/s-1"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":2}' 'http://localhost:8000/_r/s-1'
{"subtotal":5,"resumeAt":"/_r/s-2"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":1}' 'http://localhost:8000/_r/s-1'
{"error":"No continuation for s-1."}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":1}' 'http://localhost:8000/_r/s-2'
{"subtotal":6,"resumeAt":"/_r/s-3"}
$ curl -XPOST -H'Content-Type: application/json' -d'{"n":0}' 'http://localhost:8000/_r/s-3'
{"total":6}
This code has some desirable properties:
- The majority of the code deals directly with the problem domain.
- The algorithm is expressed in a straightforward, “direct” style like that of a one-off script, even though it spans multiple asynchronous requests. Legibility is the key to maintaining software with grace.
- Behold: no RxJS or weird continuation stuff. Our efforts result in less homework for people just trying to get things done, not more.
suspend
is our raison d’être. When called it
will suspend the handler execution, register it somewhere with a unique
identifier, and pass that identifier to a callback to be invoked before
exiting. Thus the handler may suspend itself and reply with the
information necessary to be resumed.
2 Continuations from streams
2.1 Will somebody please tell me what continuations are
The original paper begins assuming the reader already has some inkling of what continuations are.
The term continuation is used to refer to the state of the executing program and future work to be done.
Lisp languages, like Scheme, have support for assigning the “current
continuation” to a variable that you can invoke as a function. It’s
called call-with-current-continuation
(or the shorter
alternative call/cc
). In some senses it’s the most
fundamental control flow mechanism: the ability to pick up your current
universe or timeline and store it to resume later.
We are going to build our own call/cc
using a homemade
clone of RxJS. This is for two
reasons:
2.1.1 To give some concrete, no-bullshit meaning to continuation jargon
You don’t have to take my word for it but, anecdotally, a lot of material about continuations is written by academic types who are really excited about a code pattern they discovered but have not typically used outside of some carefully written samples in languages primarily used to write code samples, which offer little for blue color morons like me to grip and hold and look at and so on.
2.1.2 To reveal the actual mechanism beneath reactivity and threading
We could just use RxJS but that would shuffle the most critical details, yet again, to a magical black box that most readers will not have the time to review for the handful of crucial definitions buried within.
Instead, because the final product is only ~300 lines of code and because it eliminates an external dependency, we will write our own.
All this is to say, by achieving the aims of the original paper using a reactive streaming toolkit I hope to present a Rosetta stone of sorts between two active fields that don’t seem to be cross-pollinating as much as they should.
2.2 Double Negation and Inverting Control
An Observable inverts control. In the most basic setting, a
function A -> B
is simply applied to a suitable argument
A
to produce a concluding B
. An Observable,
instead, is an A
-like thing which consumes functions
A -> B
as its arguments with a method called
subscribe
. The ultimate goal is the same: to annihilate an
A
with some A -> B
.
The following is a stripped down imitation of the fundamental RxJS type which achieves this.
interface Observer<A, B = void> {
next(value?: A): B;
}
class Observable<A> {
constructor(
private readonly _action: (c: Observer<A, unknown>) => (() => void) | void
) {}
public subscribe<B = void>(
: Observer<A, B> | ((a: A) => B)
observer: () => void {
)const unsubscribe =
"function" === typeof observer
? this._action({
next(v: A): B {
return observer(v);
,
}
}): this._action(observer);
return "function" === typeof unsubscribe
? unsubscribe
: () => {
return;
;
}
} }
Borrowing a term from intuitionistic
logic, for a given value A
its negation is an
implication which assumes A
and concludes false
(it literally negates A
).Programming is an application of intuitionistic logic
or I wouldn’t mention it.
A continuation is a negation in this sense: it represents
future work to be performed, assuming some initial value. And since false implies
true, a continuation can produce any result as long as it “doesn’t
matter.”Consider how libraries like axios or fetch model remote
network requests with callbacks (which return nothing).
Any function-like thing can be considered a negation in this way of
course. RxJS has a special type, Observer<A>
, which
represents work to be done when its next
method is called
with a suitable argument of type A
.
An Observable<A>
, being a thing which negates
things-which-negate-A
, is a double negation.
2.2.1 A quick detour
We will quickly elaborate on these types to build a simple reactive
computation á la RxJS. Reactivity is a technique, not a library or
feature you have to wait for a language to support. (Also the
prune
function is useful beyond this section, as we will
see.)
interface Operator<A, B> {
: Observable<A>): Observable<B>;
(s
}
function keep<A>(promise: PromiseLike<A>): Observable<A> {
let canceled = false;
return new Observable((observer) => {
void promise.then((value) => {
if (!canceled) {
setTimeout(() => void observer.next(value), 0);
};
})return () => {
= true;
canceled ;
};
})
}
function merge<A>(it: Iterable<Observable<A>>): Observable<A> {
let unsubbers: (() => void)[] = [];
return new Observable((observer) => {
for (const upstream of it) {
setTimeout(() => {
const unsubscribe = upstream.subscribe(observer);
.push(unsubscribe);
unsubbers, 0);
}
}return () => {
.slice().forEach((unsubscribe) => void unsubscribe());
unsubbers;
};
})
}
// Hang on to this one. We use it later to define `reset`.
function prune<A>(): Operator<A, A> {
return (source) =>
new Observable((observer) => {
const unsubscribe = source.subscribe((value: A): unknown => {
setTimeout(() => void unsubscribe(), 0);
return observer.next(value);
;
})return unsubscribe;
;
})
}
const stream1 = merge([
keep(
new Promise((resolve) => void setTimeout(() => void resolve("C"), 100))
,
)keep(
new Promise((resolve) => void setTimeout(() => void resolve("A"), 30))
,
)keep(
new Promise((resolve) => void setTimeout(() => void resolve("B"), 50))
,
);
])const stream2 = prune()(stream1);
void stream2.subscribe((response) => {
console.log("Received response:", response);
;
})// prints "Received response: A"
This contrived example fires off 3 operations which
Promise
to resolve at various times in the future, merges
their result streams into one, and finally “prunes” the output to only
the first result, discarding the latter two.
2.3 Continuations in reactive terms
This is where the link between reactivity and continuations first
became apparent to me. With a new function, reset
, we can
subscribe a Promise callback to an observable computation.
function reset<A>(observable: Observable<A>): Promise<A> {
const pruned: Observable<A> = prune()(observable);
return new Promise<A>((resolve) => void pruned.subscribe(resolve));
}
async function resetTest() {
const result = await reset(stream1);
console.log("Received response:", result);
}resetTest();
// This ALSO prints "Received response: A".
The function resetTest
is syntactic sugar for the
following:
function resetTest_keto() {
// +-- callback
// V
reset(stream1).then((result) => {
console.log("Received response:", result);
;
}) }
reset
effectively subscribes the callback function noted
above to an Observable; when combined with async/await
syntax sugar what we end up with is, if you really use your imagination,
a kind of call/cc
for async functions.
Another example will help drive this point home. This next function uses two Observables in an unusual way to achieve non-linear control flow and (promise to) return the number 16.
async function sixteen(): Promise<number> {
return reset(
new Observable((kOuter) =>
void new Observable((kInner) => {
const eight = kInner.next(4);
.next(kInner.next(eight) as number);
kOuter.subscribe((n) => {
})return (n as number) * 2;
})
)as number;
)
}// My types could be improved - in the name of flexibility I am clearly
// throwing away critical and obvious information.
The inner Observable uses its continuation kInner
twice,
which is bound to an Observer which doubles its argument and returns it.
Doubling 4 twice of course yields 16 which is then handed to the outer
continuation kOuter
.
Here is an important point: kOuter
is bound to the
resolve callback of a Promise and cannot be invoked more than
once. Doing so will result in a runtime error. However,
kInner
may be used 0-or-more times as it is not bound to a
Promise.
Observables may be used to implement complex control flow in ways Promises cannot due to their one-use restriction.
3 The web server library
Equipped with a concrete continuation mechanism we can go about implementing the example from the original paper.
The paper’s example application is a form accepting numbers as input which displays a running total. We will expose a JSON API instead, taking advantage of TypeScript’s built-in support for parsing and serializing JSON to minimize the presentation layer.
3.1 Suspensions and registries
A handful of definitions will clarify the meaning of
Handler
and its suspend
method.
interface Suspension {
: RequestData): unknown;
(u }
When a service Handler
suspends execution its current
continuation is registered somewhere with a unique ID so it can be
resumed later. We model these suspensions as unary
functions accepting RequestData
arguments. This makes
sense: execution will be resumed by a new request, after all.
type Identifier = string;
interface Registry {
register(suspension: Suspension): Identifier;
lookup(route: Identifier): Suspension | null;
remove(route: Identifier): void;
}
We keep things simple and use strings as our unique identifiers. The
interface for our suspension Registry
is likewise
straightforward: one may register
, lookup
, or
remove
suspended executions.
3.2 Handler
We have everything we need now to handle
requests which
may suspend
themselves for later resumption by a new
request.
Instead of having an ambient Registry
in the global
state like the original paper, we opt to inject it as a dependency to
each Handler
.
// eg, RouteParams<"foo"|"bar"> => { "foo": string, "bar": string }
type RouteParams<TParams extends string> = { [K in TParams]: string };
abstract class Handler<TParams extends string = never> {
constructor(protected readonly suspensions: Registry) {}
public abstract handle(
: RequestData,
request: RouteParams<TParams>
params: Promise<void>;
)
protected suspend(fn: (id: Identifier) => void): Promise<RequestData> {
return reset(
new Observable<RequestData>((resolve) => {
const resumeId = this.suspensions.register((x) => resolve.next(x));
void fn(resumeId);
return () => void this.suspensions.remove(resumeId);
});
)
} }
The original paper defines a function show
which
generates a unique suspension identifier on every request, whether it is
needed or not. This code realizes show
as two pieces: the
handle
method which initiates server sessions, and the
suspend
method which registers the current continuation
on-demand. The end result simply means you don’t have to capture the
suspension ID at the start of every service handler (but you
can).
Regardless, the heart of the original essay and this paper is
captured in the suspend
method above.
3.2.1 Side show: route parameters
Our example does not make explicit use of it, but our library allows
users to specify route parameters using a familiar syntax; the values
will be extracted and passed to handle
as a second
argument.
Eg, if you define a route /p/:foo/:bar
and receive a
request for p/A/2
then an object
{ "foo": "A", "bar": "2" }
will be passed to
handle
as its second argument. If none are specified this
argument can (and probably should) be ignored. We use it internally and
the code is smaller than I would have expected so it is included in the
appendix.
3.3 Observing incoming requests
Of course, we may also use Observables the way they’re advertised: to
process a stream, this one of HTTP requests. Node’s built-in
http
library exports a simple callback-based API for
writing HTTP services, and inverting callbacks is RxJS’ bread and
butter. The following code constructs an Observable of
RequestData
objects, each of which represents an incoming
request in need of a reply.
function httpRequests(port: number): Observable<RequestData> {
return new Observable((process) => {
const server = createServer((request, response) => {
const [route, queryString] = (request.url ?? "").split(/\?/);
let data = "";
.on("data", (payload) => {
request+= payload;
data ;
}).on(
request"end",
=>
() void process.next(
new RequestData(route, response, data, queryString ?? "")
);
);
}).listen(port, () => void console.log(`Listening on port ${port}`));
serverreturn () => void server.close();
;
}) }
3.4 Connecting everything with the router
For convenience here is the last part of the example application again:
void httpRequests(8000).subscribe(
router({
"/sum": ExampleApp
}); )
The Observer we subscribe to our httpRequests
Observable
is the return value of the enigmatic router
function. We’ll
shootdefine it first and ask questions later.
type Routes = Record<Route, new (r: Registry) => Handler>;
function router(routes: Routes) {
"_r/:id"] = class extends Handler<"id"> {
routes[async handle(request: RequestData, params: RouteParams<"id">) {
const resume = this.suspensions.lookup(params.id);
void (resume
? resume(request)
: request.replyError(404, {
: `No continuation for ${params.id}.`
error;
}))
};
}return {
/* This method defines an Observer. */
async next(request: RequestData) {
let handler, template;
for (const [_template, _handler] of Object.entries(routes)) {
if (matchesPath(_template, request.route)) {
= _template;
template = _handler;
handler break;
}
}if (!handler || !template) {
.replyError(404, {
request: `No handler found for route ${request.route}`
error;
})return;
}try {
await new handler(this).handle(
,
requestpathToObject(template, request.route)
;
)catch (err) {.
} .replyError(500, {
request: err.toString()
error;
})
}return;
,
}/* These properties and methods define a Registry. */
: {},
_store: 0,
_uniqueregister(suspension: Suspension): Identifier {
const id = `s-${++this._unique}`;
this._store[id] = suspension;
return id;
,
}lookup(route: Identifier): Suspension | null {
return this._store[route] ?? null;
,
}remove(route: Identifier): void {
delete this._store[route];
};
}
}
type Router = ReturnType<typeof router>;
Let’s review this.
3.4.1
The resumption Handler
We first insert a special handler for resuming our suspended
continuations. The author is pleased he was able to implement this
feature using the library itself.
Requests to /_r/:id
, where id
is a unique string, will invoke the lookup
method on
Registry
to fish out the Suspension
and invoke
it with the fresh RequestData
.
If there is no suspended session a 404
is sent back to
the client. Otherwise it is invoked with its argument and original
execution will resume.
3.4.2
The RequestData
Observer
The next
method is what makes the Router
object an Observer
of RequestData
objects. It
begins by looking up a handler which matches the requested route. If
found the Handler
is instantiated; if not, again a
404
is sent sent as the reply.
The helper functions which marshal parameters out of the route template are included at the end in the Appendix.
3.4.3
The Registry
In addition to being an Observer
, the
Router
is also our Registry
, as evidenced by
it injecting itself into each Handler
it instantiates.
Obviously there are reasons to split this Router
object
up into multiple separate types with separate concerns. The interface
guarantees that client code will be called out by the type checker if it
tries to do anything besides lookup
, register
,
and remove
suspensions, but this really only helps
you prevent mistakes in your code; at runtime other
people’s code does not have to ask tsc
for permission to do
anything, of course.
Fortunately this design makes it very easy to modify the
route
function without needing to update existing
applications which use the library, or major overhauls to the rest of
the library (not that “the rest” comprises very much).
4 Conclusion
This essay is not meant to be a substitute for the original paper.
The author does not have the education, skill, or disposable time to
accomplish such. However, by achieving what it does in such a small
amount of code which can be run directly with ts-node
, I
hope I have demonstrated both a powerful technique for structuring web
services as well as a novel correspondence between two subjects which
are not often discussed together: reactivity and continuations.
5
Appendix: RequestData
and route parameters
All of the code above implies a RequestData
object which
provides structured access to the incoming request data as well as a
means of replying. It is not terribly surprising and is presented here
for completeness and to show there is nothing up my proverbial sleeves.
Additionally I have provided the remaining helpers used by the above
code to parse request routes. It’s clever and worth looking at but not
the focus of this essay.
type Route = string;
class RequestData {
public readonly queryParameters: Record<string, string>;
constructor(
public readonly route: Route,
public readonly response: ServerResponse,
public readonly data: string,
: string
queryString
) {this.queryParameters = Object.fromEntries(
.split(/&/).map((x) => x.split(/=/))
queryString;
)
}
public writeResponse(
: number,
statusCode: string,
payload= "application/json"
contentType
) {this.response.writeHead(statusCode, { "Content-Type": contentType });
this.response.end(payload);
}
protected serializeResponse(message?: unknown): string {
return "undefined" === typeof message
? ""
: "string" === typeof message
? message
: JSON.stringify(message);
}
public replyOk(message?: unknown): void {
const payload = this.serializeResponse(message);
return this.writeResponse(message ? 200 : 204, payload);
}
public replyError(statusCode: number, message?: unknown): void {
const payload = this.serializeResponse(message);
return this.writeResponse(statusCode, payload);
}
public parse<T extends object>(): T {
return JSON.parse(this.data) as T;
}
}
const captureVariables = /:(\w*)/gi;
function templateToRegExp(template: string) {
return new RegExp(
.replace(
template,
captureVariables, variable) => `(?<${variable}>[^/]*)`
(_+ "$",
) "gi"
;
)
}
function matchesPath(template: string, pathname: string) {
return templateToRegExp(template).test(pathname);
}
function pathToObject(template: string, pathname: string) {
return (
templateToRegExp(template).exec(decodeURIComponent(pathname))?.groups ?? {}
;
) }