Under the hood
CONNECTIVE is a thin-layer on top of
RxJS. At a base level, it just offers a more
intuitive interface for creating observable sequences, by encapsulating any node in a given sequence as a
pin, which then can be connected to other pins in any fashion suitable
to your situation.
More specifically, each pin can be thought of as a lazy observable of an emission (
Observable
<Emission>
). This observable is accessible via a pin's
.observable property. It is not resolved until it is needed
to be resolved, which allows defining pins and connecting them to each other freely.
Observable resolution
The
.observable of a pin is resolved when one of the following happens:
- The .observable property is accessed,
- The .subscribe() method is invoked,
- The .bind() method is invoked (when available).
Resolution process
The observable resolution of a pin, say
P, typically involves the following steps:
- The .observables of all pins connected to P are fetched,
- These observables are combined using a combination operator, typically
merge(),
though for example control() uses
zip(),
- The combined observable is transformed by some other pipeable operators. For example,
map() might pipe this combined observable either through RxJS's
map() operator or through
mergeMap() operator, depending on whether its transformation function is sync or async,
- The transformed observable is set as the .observable of the pin.
Propagation and locking
Because of the process outlined above, once the
.observable property of
P
is resolved, it cannot be changed. This is when
P is
locked
(
P.locked == true).
Since resolution of the observable is just dependant on pins connected to
P, you can no longer
connect other pins to
P, though you still can connect
P to other unlocked
pins.
This also means that for
P to be resolved, all pins connected to
P must also be
resolved (and hence locked) first. This behavior means that at any point in time exactly the part of the flow that
needs to be locked is locked and the rest can still be modified.
import { pin } from '@connectv/core';
let a = pin();
let b = pin();
let c = pin();
a.to(b, c);
/*!*/b.subscribe(); //--> so we will lock b
/*!*/console.log(a.locked); //--> a is locked as well
/*!*/console.log(c.locked); //--> but c is not.
Subscribing
Calling the
.subscribe() method of a pin basically results in the provided functions to be subscribed
to the underlying observable (and hence results in its resolution). This is why the
.subscribe()
method of a pin has the exact same signature as
.subscribe()
method of an
Observable.
The inner observable of a pin is not a generic observable but of type
Observable<Emission>.
So when you subscribe on
.observable directly, you will receive the emission objects instead of
raw values. When you call
.subscribe() on the pin, it will map the emission objects to their raw
values before passing them to your subscription functions.
import { source, map } from '@connectv/core';
let a = source();
let m = a.to(map(x => x * 2));
/*!*/m.subscribe(console.log); //--> logs raw values
/*!*/m.observable.subscribe(console.log); //--> logs emissions
a.send(2);
a.send(42);
The pin's
.subscribe() method also returns the exact same
Subscription object, which you can use to unsubscribe later. The pin itself will hold
a reference to this subscription and unsubscribe when its
.clear() method is invoked.
Binding
Calling the
.bind() method on pins that have it is exactly the same as calling
.subscribe() without any parameters. Calling it on an agent (for example,
state()) also entails the
.subscribe() method
being invoked on some of its internal pins. Be careful as this might result in
.subscribe()
being invoked on an internal observable multiple times.
Observable sharing
By default, the resolved observables
ARE NOT SHARED. This means that when a pin is connected to multiple
other pins, or when a pin is subscribed to multiple times, its side effects will be executed multiple times.
import { source, map, pin, control } from '@connectv/core';
let a = source();
a.to(map(x => {
/*!*/ console.log('SIDE EFFECT!'); //--> so this side effect will be called multiple times
return x;
}))
/*!*/.to(pin(), pin()) //--> because of this
.to(control())
.subscribe(console.log);
a.send(2);
a.send(3);
This is why your sync
map() functions must be side-effect-free, as otherwise their side-effects might
be invoked multiple times without you properly realising it.
However, in some cases the observable
is shared. These are typically cases when it is important to
not have undesired side-effects. For example, async map functions are shared, since async transformations
generally mean some time-consuming process which should not be repeated unnecessarily.
import { source, map, pin, control } from '@connectv/core';
let a = source();
/*!*/a.to(map((x, done) => { //--> async function is passed
/*!*/ console.log('SIDE EFFECT!'); //--> so this side effect will be called once
done(x);
}))
.to(pin(), pin())
.to(control())
.subscribe(console.log);
a.send(2);
a.send(3);
The following pins and agents typically share their internal observables:
Note that this list might change over time, as already there are other situations under consideration
to make sharing the default behaviour.
You can always enforce sharing in your flow, by using
pipe()
pin and
RxJS's
share() operator:
import { source, map, pin, control, pipe } from '@connectv/core';
import { share } from 'rxjs/operators';
let a = source();
a.to(map(x => {
/*!*/ console.log('SIDE EFFECT!'); //--> side effect called once
return x;
}))
/*!*/.to(pipe(share())) //--> because of this
.to(pin(), pin())
.to(control())
.subscribe(console.log);
a.send(2);
a.send(3);