Under the hood

CONNECTIVE is a thin-layer on top of RxJS. At a base level, it just offers a more intuitive interface for creating observable sequences, by encapsulating any node in a given sequence as a pin, which then can be connected to other pins in any fashion suitable to your situation.

More specifically, each pin can be thought of as a lazy observable of an emission ( Observable <Emission> ). This observable is accessible via a pin's .observable property. It is not resolved until it is needed to be resolved, which allows defining pins and connecting them to each other freely.

Observable resolution

The .observable of a pin is resolved when one of the following happens:

Resolution process

The observable resolution of a pin, say P, typically involves the following steps:
  1. The .observables of all pins connected to P are fetched,
  2. These observables are combined using a combination operator, typically merge(), though for example control() uses zip(),
  3. The combined observable is transformed by some other pipeable operators. For example, map() might pipe this combined observable either through RxJS's map() operator or through mergeMap() operator, depending on whether its transformation function is sync or async,
  4. The transformed observable is set as the .observable of the pin.

Propagation and locking

Because of the process outlined above, once the .observable property of P is resolved, it cannot be changed. This is when P is locked (P.locked == true). Since resolution of the observable is just dependant on pins connected to P, you can no longer connect other pins to P, though you still can connect P to other unlocked pins.

This also means that for P to be resolved, all pins connected to P must also be resolved (and hence locked) first. This behavior means that at any point in time exactly the part of the flow that needs to be locked is locked and the rest can still be modified.
  import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

a.to(b, c);

/*!*/b.subscribe();         //--> so we will lock b
/*!*/console.log(a.locked); //--> a is locked as well
/*!*/console.log(c.locked); //--> but c is not.

Subscribing

Calling the .subscribe() method of a pin basically results in the provided functions to be subscribed to the underlying observable (and hence results in its resolution). This is why the .subscribe() method of a pin has the exact same signature as .subscribe() method of an Observable.

The inner observable of a pin is not a generic observable but of type Observable<Emission>. So when you subscribe on .observable directly, you will receive the emission objects instead of raw values. When you call .subscribe() on the pin, it will map the emission objects to their raw values before passing them to your subscription functions.
import { source, map } from '@connectv/core';

let a = source();
let m = a.to(map(x => x * 2));

/*!*/m.subscribe(console.log);            //--> logs raw values
/*!*/m.observable.subscribe(console.log); //--> logs emissions

a.send(2);
a.send(42);
The pin's .subscribe() method also returns the exact same Subscription object, which you can use to unsubscribe later. The pin itself will hold a reference to this subscription and unsubscribe when its .clear() method is invoked.

Binding

Calling the .bind() method on pins that have it is exactly the same as calling .subscribe() without any parameters. Calling it on an agent (for example, state()) also entails the .subscribe() method being invoked on some of its internal pins. Be careful as this might result in .subscribe() being invoked on an internal observable multiple times.

Observable sharing

By default, the resolved observables ARE NOT SHARED. This means that when a pin is connected to multiple other pins, or when a pin is subscribed to multiple times, its side effects will be executed multiple times.
import { source, map, pin, control } from '@connectv/core';

let a = source();

a.to(map(x => {
/*!*/  console.log('SIDE EFFECT!');   //--> so this side effect will be called multiple times
  return x;
}))
/*!*/.to(pin(), pin())                //--> because of this
.to(control())
.subscribe(console.log);

a.send(2);
a.send(3);
This is why your sync map() functions must be side-effect-free, as otherwise their side-effects might be invoked multiple times without you properly realising it.

However, in some cases the observable is shared. These are typically cases when it is important to not have undesired side-effects. For example, async map functions are shared, since async transformations generally mean some time-consuming process which should not be repeated unnecessarily.
import { source, map, pin, control } from '@connectv/core';

let a = source();

/*!*/a.to(map((x, done) => {          //--> async function is passed
/*!*/  console.log('SIDE EFFECT!');   //--> so this side effect will be called once
  done(x);
}))
.to(pin(), pin())
.to(control())
.subscribe(console.log);

a.send(2);
a.send(3);
The following pins and agents typically share their internal observables: Note that this list might change over time, as already there are other situations under consideration to make sharing the default behaviour.

You can always enforce sharing in your flow, by using pipe() pin and RxJS's share() operator:
import { source, map, pin, control, pipe } from '@connectv/core';
import { share } from 'rxjs/operators';

let a = source();

a.to(map(x => {
/*!*/  console.log('SIDE EFFECT!');   //--> side effect called once
  return x;
}))
/*!*/.to(pipe(share()))               //--> because of this
.to(pin(), pin())
.to(control())
.subscribe(console.log);

a.send(2);
a.send(3);



Copied to Clipboard!