Description Of Wonka Operators

15 Jul 2024 - Ben

Suffice to say, wonka operators are a simple concept that becomes incredibly tricky very quickly! This post is about wonka streams and callbags: a fusion of Observables and Iterators.
I would read up on some of this if anything in this post confuses you; there isn’t enough room to go over everything I’d like to. The best metaphor for this methodology is a pipeline or a river system.

Operators let us transform the stream of data that comes through. In the wonka and callbag specs these are very simple looking functions, that receive other functions, and sometimes call other functions. All these functions flying around are confusing! Yes, they are all the same javascript construct, a function, but they have very different roles.

An operator is a function that returns a function

An operator is a function that accepts a source function and returns a sink function

An operator is a function that accepts an upstream function and returns a downstream function

The upstream function should be called with a callback function that will be called with a signal, and doesn’t return anything. The upstream function determines when this function will be called, and can call it as many times as it’d like. It can signal to us that data will begin flowing (START), that it has data to give us (PUSH) and also when it has run out of data (END).

source((signal) => {/* could be START, PUSH, or END */})


A downstream function should be called with a signal, whenever you’re ready in the operator. You call it with the START, PUSH, and END signals to talk to the function downstream of you in the pipeline.

return (sink) => {
  sink(START, /* talkback */);
  sink(PUSH, /* data */);
  sink(END);
}


When an upstream function calls the downstream function with START, it should provide another function to “talkback” to it, which the downstream function can tell the upstream function it’d like more data (PULL) or it’s done receiving data (CLOSE). This is usually called the talkback function.

return (sink) => {
  sink(START, (talkbackSignal) => {/* could be PULL or CLOSE */})
}


There is a lot of flexibility here… Our operator needs to call the source and call the sink; that’s basically it for a functioning operator. A well-behaved operator will also END when asked, probably sending that signal downstream, and forward a CLOSE signal upstream. But it might not, depending on how you want to control the flow of data through your operator. That’s the beauty: you have the flexibility to manipulate the data any way you see fit. But it also means that it is on you to explain how your operator transforms the flow through it, and what your operator does when it receives each of the different signals.

OperatorUpstreamDownstreamStartTalkbackPushDataEndPullClose


This was my very high level overview. They can be very confusing to look at, at first, so let me leave you with a few examples:

// no-op operator, pass the sink we were given directly to the source provided, add no behavior
function operator(source) { return (sink) => {source(sink)})}
// no-op operator
function operator(upstream) return { return function downstream(sink) { source(sink); }}
// basic shell, non functional since I'm not handling any of the signals
function operator(upstream) {
  return function downstream(sink) {
    /* call the source */
    upstream(function ourSink(signal, payload) {
      /* do whatever we need with the START, PUSH, END signals */
    })

    sink(START, function talkback(talkbackSignal, payload) {
      /* do whatever we need with the PULL and CLOSE signals */
    }))
  }
}
// no-op operator, but a basic shell of what could be
function operator(upstream) {
  let ended = false;
  let talkback = () => {};

  return function downstream(sink) {
    /* now that we have the sink */

    /* call the source  */
    upstream((signal, payload) => {
      if(ended) {/* this whole call will be a no-op if we've received an END signal already */ }
      else if(signal === END) { 
        ended = true; 
        /* what else might you want to teardown when it ends? */
        }
      else if(signal === START) { 
        talkback = payload; // hold a reference to the talkback function, so we can talk to our source
        /* what else might you want to start to be ready to receive data? */
        }
      else {
        /* do things here with Push signal */

        // would send the push signal we receive from upstream directly to downstream,
        // not typically useful by itself.
        sink(signal, payload); 
        // cachedData = payload; // maybe store the data locally, so we can send it down later?
      }
    });

    // remember to start the sink so we can give it our talkback function!
    sink(START, (signal, payload) => {
      // since this is the talkback, signal will be CLOSE or PULL
      if(signal === CLOSE) {
        talkback(CLOSE);
      } else if {
        // do something with the PULL signal
        talkback(PULL);  // maybe ask the source for more data?
        // sink(PUSH, cachedData); // maybe push some locally cached data down right away, instead?
      }
    }})

  }
}
function map(mapper: (x: X) => Y) {
  return function operator(upstream: Source<X>) {
    return function downstream(sink: Sink<Y>) {
      let ended = false;
      let talkback = () => {};
      source((signal, payload) => {
        if(ended) {}
        else if(signal === END) {ended = true}
        else if(signal === STARt) {talkback = payload}
        else {
          sink(PUSH, mapper(payload));
        }
      });
      sink(START, (signal, payload) => {
        if(signal === CLOSE) {
          talkback(CLOSE);
        } else if {
          talkback(PULL);
        }
      })
    }
  }
}

// usage:
pipe(fromArray([1, 2, 3]), map(x => x + 1), subscribe(console.log); // prints 2, 3, 4 each on their own line
// looks weird without the pipe though...
subscribe(console.log)(map(x => x + 1)(fromArray([1, 2, 3])));


Additional reading: