CONNECTIVE v RxJS

CONNECTIVE is NOT an alternative to RxJS. It is designed to complement what RxJS offers by providing a different model of reactive flows more suitable to larger and more complex cases. This also means that you cannot fully utilize CONNECTIVE without using it in tandem with RxJS.

CONNECTIVE complements RxJS in three areas:

Deferred construction

In RxJS, each observable is created by combining and transforming some already instantiated observables. This limits description of your flow to a strict sequential order, which creates issues when you have large flows expanding over multiple files/modules.

CONNECTIVE on the other hand allows describing the flow in any order, allowing you to organise large flows into multiple modules seamlessly.

For example, imagine one module describing the beginning and the end of a flow, while multiple others would connect this beginning and the end in various manners. A real-life example of this would be a module to feed incoming http requests to the flow, and then create proper http responses and send them accordingly.

// Main Module
let flowStart = someSource().to(...).to(...);
let flowEnd = pin();
flowEnd.to(...);

export { flowStart, flowEnd };
// Other Modules
import { flowStart, flowEnd } from './main';

flowStart.to(...).to(...).to(flowEnd);
// Main Module
let flowStart$ = Observable().pipe(...);
let middleMen = [];

export function fillInTheMiddle(fillerFunc) {
  middleMen.push(fillerFunc(flowStart));
}

export function flowEnd() {
  return merge(...middleMen).pipe(...);
}
// Other Modules
import { fillInTheMiddle } from './main';

fillInTheMiddle(flowStart$ => {
  return flowStart$.pipe(...)
});


Modelling the flow

In RxJS, a reactive flow is modelled as a sequence of mergers and transformations of some observables. This leads to the famous observable sequence model, which despite its name is mostly a series of related inverse trees.

However, with increase of complexity of the reactive flows, they typically take on the shape of a generic directed graph. This disparity can lead to extra overhead code and reduced readability. This is why in CONNECTIVE reactive flows are modelled exactly as directed graphs, to keep the model of the flow and the code describing it as close as possible.

a.to(X).to(Y)
  .to(b, c).serialTo(d, e)
  .to(Z)
let a0 = a.pipe(X, Y);
let Z = merge(
  a0.pipe(b, d), 
  a0.pipe(c, e)
).pipe(...);


Flow re-use

When reactive flows grow in size and complexity, there is an absolute necessity for a standard abstraction facilitating re-use of flows. RxJS does not offer any such abstraction beyond custom pipeable operators, which due to them being (typically) transformations of only a single observable to another single observable limits their applicability as such an abstraction (its like if functions could only be defined with one parameter).

CONNECTIVE offers a much more widely applicable abstraction for re-use: Agent, which models any re-usable part of a flow as a black box with some incoming pins and some outgoing pins. It also offers a really simple and straight-forward mechanism to build custom agents in form of compositions, which are agents whose internals are also partial reactive flows.

export const myComp = composition(() => {
  let a = pin(), b = pin(); //--> inputs
  let c = pin(), d = pin(); //--> outputs

  ...                       //--> connect stuff together

  return [{ a, b }, { c, d }];
});
import { myComp } from './wherever';

group(x, y).serialTo(myComp()).to(z);
export function myThing(a: Observable, b: Observable) {
  let c: Observable, d: Observable; //--> outputs

  ...                               //--> build the outputs out of the inputs

  return [c, d];
}
import { myThing } from './wherever';

let [z1, z2] = myThing(x, y);
let z = merge(z1, z2).pipe(...);


A more complete example

The following example puts all the aforementioned issues on display in a more practical scenario:

/*** click-counter.ts ***/
import { composition, pin, map, pipe, state, check, expr, sampler } from '@connectv/core';
import { debounceTime, delay } from 'rxjs/operators';


export const clickCounter = composition(track => {
  let click = pin(), reset = pin();
  let out = pin(), single = pin(), multi = pin();

  let count = state(0); track(count);
  let incr = expr(x => x + 1), docheck = sampler();

  count.to(incr).to(count).to(out, docheck);
  reset.to(map(() => 0)).to(count);

  docheck.to(check(x => x > 1)).serialTo(multi, single);

  click.to(incr.control);
  click.to(pipe(debounceTime(200)))
    .to(reset.from(pipe(delay(1))), docheck.control);

  return [{ click, reset }, { out, single, multi }];
});
/*** index.ts ***/
import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';

import { clickCounter } from './click-counter';

let p = document.getElementById('p');

wrap(fromEvent(document, 'click')).serialTo(clickCounter())
.serialTo(filter(x => x > 0), map(() => 'single click'), map(() => 'MULTI CLICK!'))
.subscribe(v => p.innerHTML = v);
    
/*** click-counter.ts ***/
import { Observable, BehaviorSubject, merge, NEVER, defer } from 'rxjs';
import { map, sample, tap, debounceTime, delay, mapTo, 
        partition, distinctUntilChanged} from 'rxjs/operators';


export function clickCounter(clickObs$, resetObs$?, track?) {
  let count = new BehaviorSubject(0);

  let timer = clickObs$.pipe(debounceTime(200));
  let [multi, single] = count.pipe(
    sample(timer),
    //@ts-ignore
    partition(x => x > 1)
  );

  let deferred = false;
  let sub;
  let _c = defer(() => {
    if (!deferred) {
      sub = merge(
        count.pipe(sample(clickObs$), map(x => x + 1)),
        count.pipe(sample(timer), delay(1), mapTo(0)),
        (resetObs$ || NEVER).pipe(mapTo(0))
      ).subscribe(count);

      deferred = true;
    }

    return count;
  }).pipe(distinctUntilChanged());

  if (track) track(sub); 
  return [_c, single, multi];
}
/*** index.ts ***/
import { fromEvent, merge } from 'rxjs';
import { filter, mapTo } from 'rxjs/operators';

import { clickCounter } from './click-counter';

let p = document.getElementById('p');

let [count$, single$, multi$] 
  = clickCounter(fromEvent(document, 'click'));

merge(
  count$.pipe(filter(x => x > 0)),
  single$.pipe(mapTo('single click')),
  multi$.pipe(mapTo('MULTI CLICK!'))
).subscribe(v => p.innerHTML = `${v}`);
    
Although the differences between CONNECTIVE and RxJS get more and more pronounced with increase in size and complexity of flows, the difference is already evident in simpler cases still matching the essential criteria outlined above. It is also evident how these issues would cause increased amounts of code-overhead and noise the bigger the project becomes.

NOTICE: what is outlined in this example is obviously NOT the most efficient way of counting clicks and distinguish single clicks from multi-clicks, neither in RxJS nor in CONNECTIVE. It is just written as such to highlight the aforementioned phenomena.




Copied to Clipboard!