CONNECTIVE facilitates large-scale
reactive programming
in Javascript/Typescript.
It enables declarative creation of large and complex data/event flows and encourages re-use of flows.
npm i @connectv/core
A
Hellow World! example:
/*!*/import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';
let a = document.getElementById('a') as HTMLInputElement;
let p = document.getElementById('p');
//
// Will say hello to everyone but 'Donald'.
// For obvious reasons.
//
/*!*/wrap(fromEvent(a, 'input')) // --> wrap the `Observable` in a `Pin`
/*!*/.to(map(() => a.value)) // --> map the event to value of the input
/*!*/.to(filter(name => name != 'Donald')) // --> filter 'Donald' out
/*!*/.to(map(name => 'hellow ' + name)) // --> add 'hellow' to the name
/*!*/.subscribe(msg => p.innerHTML = msg); // --> write it to the <p> element
A more elaborate example:
/*!*/import { wrap, pipe, map, filter, sink } from '@connectv/core';
import { fromEvent, timer } from 'rxjs';
import { delay, debounceTime } from 'rxjs/operators';
let a = document.getElementById('a');
let p = document.getElementById('p');
//
// Will calculate fibonacci sequence up to given index, displaying every number in the
// sequence along the way.
//
// --> calculate next iteration step on fibonacci sequence
/*!*/let m = map(([next, prev, l]) => [next + prev, next, l - 1]);
/*!*/wrap(fromEvent(a, 'input')) // --> wrap the `Observable` in a `Pin`
/*!*/.to(pipe(debounceTime(1000))) // --> wait for people to type in the number
/*!*/.to(map(() => parseInt((a as any).value))) // --> map the input event to value of the input
/*!*/.to(map(n => [1, 0, n])) // --> map the number to start iteration
/*!*/.to(filter(([_, __, l]) => l >= 0)) // --> check if we should do any iteration
/*!*/.to(m) // --> calculate next step
/*!*/.to(pipe(delay(300))) // --> take a breath
/*!*/.to(filter(([_, __, l]) => l > 0)) // --> check if we should continue
/*!*/.to(m) // --> back to the loop
/*!*/.to(map(([_, f, __]) => f)) // --> btw, lets take each number in the sequence
/*!*/.to(sink(v => p.innerHTML = v)) // --> set the text of <p> to the fib number
/*!*/.subscribe(); // --> bind the whole thing.
CONNECTIVE is a thin layer on top of
RxJS.
While
RxJS's API excels at short-lived and small reactive flows,
CONNECTIVE provides
an API that makes creating long-living and large and/or complex reactive flows easy and intuitive.
How To Install
Using
NPM:
npm i @connectv/core
Getting it via a CDN:
<!-- Click on each line to copy it -->
<!-- Dependencies -->
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/lodash@4.17.14/lodash.min.js"></script>
<script src="https://unpkg.com/@connectv/core/dist/bundles/connective.es5.min.js"></script>
How To Use
Read the docs
for properly utilizing
CONNECTIVE. A
basic knowledge of RxJS
would also help, though it is not mandatory. For basic usage and/or for getting started, here is a quick-dive:
Quick Dive
CONNECTIVE is about creating reactive flows. A reactive flow typically starts with some source of data/events,
leading to some sinks for those data/events which would consume them, possibly going through and being transformed by
some other nodes along the way:
/*!*/import { source, sink, pin } from '@connectv/core';
let a = source();
let b = pin();
let c = sink(value => console.log(value));
/*!*/a.to(b).to(c); // --> data/events go from a to b then to c.
c.bind();
a.send('hellow!');
a.send('world!');
/*!*/import { source, sink, pin } from '@connectv/core';
let a = source();
let b = pin();
let c = sink(v => console.log(v));
/*!*/c.from(b).from(a); // --> data/events that c receives come from b which in turn come from a.
c.bind();
a.send('hellow!');
a.send('world!');
Every node of such a flow is represented by a
Pin
(or some
PinLike
object). The
pin(),
source() and
sink() functions all return a
Pin.
You can connect pins to each other using their
.to() and
.from() methods.
You can pass multiple parameters to
.to() and
.from(), causing the
Pin
to be connected to multiple pins:
import { source, sink } from '@connectv/core';
let a = source();
let b = sink(value => console.log('B::' + value));
let c = sink(value => console.log('C::' + value));
/*!*/a.to(b, c); // --> stuff from a goes to both b and c
b.bind(); c.bind();
a.send('hellow!');
a.send('world!');
import { source, sink } from '@connectv/core';
let a = source();
let b = source();
let c = sink(value => console.log(value));
/*!*/c.from(a, b); c.bind(); // --> c's stuff come from both a and b
a.send('hellow!');
b.send('world!');
Serial connections
You can also connect series of pins to each other using
.serialTo()
and
.serialFrom() methods:
/*!*/import { source, sink, group } from '@connectv/core';
let a = source();
let b = source();
let c = sink(value => console.log('C::' + value));
let d = sink(value => console.log('D::' + value));
/*!*/group(a, b).serialTo(c, d);
/*!*/group(c, d).bind();
a.send('hellow!');
b.send('world!');
This means
a is connected to
c and
b is connected to
d.
Also note how you can call
.bind() on both
c and
d
simultaenously using
group().
Subscribing
You can directly subscribe on any
Pin using its
.subscribe() method, so you don't need to use
sink() all the time.
You can also use
group() to work with a multitude of
Pins at once:
import { source, pin, group } from '@connectv/core';
let a = source();
let b = source();
/*!*/group(a, b).to(pin()).subscribe(v => console.log(v));
a.send('hellow!');
b.send('world!');
Note that:
- the .bind() method is not present on all pin types.
- when you call .subscribe() or .bind() methods of a pin, the pin
becomes locked. You cannot connect any new pin to a locked pin.
- if a pin is connected to another locked pin, it will become locked as well.
- you can check if a pin is locked via its .locked property.
import { pin } from '@connectv/core';
let a = pin();
let b = pin();
a.to(b);
console.log('A:: ' + a.locked + ' B:: ' + b.locked);
// > 'A:: false B:: false'
b.subscribe();
console.log('A:: ' + a.locked + ' B:: ' + b.locked);
// > 'A:: true B:: true'
Sources
You can also create sources from plain values or from
Observables:
/*!*/import { wrap, value, pin, group } from '@connectv/core';
import { interval } from 'rxjs';
/*!*/let a = value('hellow world!'); // --> a emits 'hellow world!' once
/*!*/let b = wrap(interval(1000)); // --> b emits a number every 1000ms
group(a, b).to(pin()).subscribe(v => console.log(v));
Note that you cannot connect any pin to
wrap().
Spread
You can use
spread() to turn incoming values into multiple emissions:
/*!*/import { spread, value } from '@connectv/core';
value([1, 2, 3, 4, 5, 6])
/*!*/.to(spread())
.subscribe(v => console.log(v));
Filter & Map
You can change incoming values using
map(), and let them through or block them using
filter():
/*!*/import { spread, value, filter, map } from '@connectv/core';
value([1, 2, 3, 4, 5, 6])
.to(spread())
/*!*/.to(filter(v => v % 2 == 0)) // --> only allow even ones through
/*!*/.to(map(v => v * 10)) // --> multiply each value by 10
.subscribe(v => console.log(v));
Control
You can break your flow into multiple branches and join them back using
control():
/*!*/import { spread, value, map, control } from '@connectv/core';
value([1, 2, 3, 4, 5, 6])
.to(spread())
.to(
map(v => v * 10),
map(v => v * 100),
)
/*!*/.to(control()) // --> will wait for all incoming emissions and join them pair-wise
.subscribe(v => console.log(v));
Loops
You can even create loops in your flow:
/*!*/import { source, map, filter } from '@connectv/core';
let a = source();
/*!*/let m = map(x => x + 1); // --> lets give this pin a name so we can loop back to it
a.to(m)
/*!*/.to(filter(x => x < 10)) // --> keep looping until we reach 10
/*!*/.to(m) // --> looping back to m
.subscribe(v => console.log(v));
a.send(0);
Pipe
In the example above, the order is not necessarily preserved. You can for example enforce the order by using
RxJS's
delay() operator:
/*!*/import { source, map, filter, pipe } from '@connectv/core';
import { delay } from 'rxjs';
let a = source();
let m = map(x => x + 1); // --> lets give this pin a name so we can loop back to it
a.to(m)
.to(filter(x => x < 10)) // --> keep looping until we reach 10
/*!*/.to(pipe(delay(1))) // --> wait a bit
.to(m) // --> looping back to m
.subscribe(v => console.log(v));
a.send(0);
pipe() allows you to use any
RxJS pipeable operator.
Note that values are not passed directly to pipeable operators, they are wrapped in
Emission
objects, and their return value should
also be of that type.
Composition
You can re-use parts of your reactive flows using
composition():
/*!*/import { composition, source, pin, filter, sink } from '@connectv/core';
/*!*/const evenOdd = composition(() => { //--> define the composition
/*!*/ let input = pin(); //--> it has one input pin
/*!*/ let odd = input.to(filter(x => x % 2 == 1));
/*!*/ let even = input.to(filter(x => x % 2 == 0));
/*!*/
/*!*/ return [{ input }, { odd, even }]; //--> return the inputs and outputs in the end
/*!*/});
//
// now using the composition:
//
let a = source();
/*!*/a.to(evenOdd()).serialTo( //--> `serialTo()` here connects ...
sink(v => console.log('ODD:: ' + v)), //... the first output of `evenOdd()` to this sink ...
sink(v => console.log('EVEN:: ' + v)) //... and the second output to this sink.
).subscribe();
a.send(2);
a.send(3);
a.send(4);
The composition is just a part of your reactive flow, with some pins going into it (i.e. inputs)
and some pins coming out of it (i.e. outputs). These two groups of pins are marked in the
composition definition:
return [{ input }, { odd, even }];
Which means
input is the input pin of this composition and
odd and
even are the output pins.
In the example above,
a.to(evenOdd()) connects
a to all of the input
pins of
evenOdd() (which is one pin,
input), and the
serialTo() connects the outputs (
odd and
even) correspondingly
to provided sinks. You can access these input and output pins individually as well:
let e = evenOdd();
// --> so basically these are all equivalent:
/*!*/a.to(e.in('input'));
/*!*/e.out('even').to(sink(v => console.log('ODD:: ' + v))).subscribe();
/*!*/e.out('odd').to(sink(v => console.log('EVEN:: ' + v))).subscribe();
// ... using group:
/*!*/a.to(e.in('input'));
/*!*/group(e.out('odd'), e.out('even'))
/*!*/.serialTo(
/*!*/ sink(v => console.log('ODD:: ' + v)),
/*!*/ sink(v => console.log('EVEN:: ' + v))
/*!*/).subscribe();
// ... implicit connections:
/*!*/a.to(e).serialTo(
/*!*/ sink(v => console.log('ODD:: ' + v)),
/*!*/ sink(v => console.log('EVEN:: ' + v))
/*!*/).subscribe();
Agent
Compositions are sub-classes of
Agent
class. This base class represents all re-usable reactive flows.
CONNECTIVE comes with a number
of useful agents that you can use in creating your own flows and compositions.
The main property of an Agent is its
Signature. A signature is simply an object with a list of
strings named
inputs and a list of strings named
outputs. These two lists
denote the possible names of the incoming and outgoing pins of the Agent.
Expr
expr() allows you to convert any function into an agent:
/*!*/import { source, expr, group } from '@connectv/core';
let a = source();
let b = source();
group(a, b)
/*!*/.serialTo(expr((a, b) => `${a} + ${b} = ${a + b}`)) //--> a is connected to the first input of the expr,
//... b to the second
.subscribe(console.log); //... and the output is subscribed to.
a.send(2); b.send(3);
a.send(42); b.send(30);
Notice how the expr is called for every new value provided by the sources after the two initial values.
In the example above, we have again used
.serialTo() to implicitly connect to inputs
of the
expr(), but you could retrieve them individually as well:
let e = expr((a, b) => a + b);
group(a, b).serialTo(e.in(0), e.in(1));
e.result.subscribe(console.log); // --> this is short-hand for `e.out('result')`
Node
A
Node
is basically a more complex version of an Expr, allowing you to emit to any outputs denoted in the signature
(instead of just
"result" pin). You can create nodes easily using
node() function:
/*!*/import { source, node } from '@connectv/core';
/*!*/let N = node({ inputs: ['a'], outputs: ['even', 'odd']}, // --> set the signature
/*!*/ (input, output) => {
/*!*/ if (input.a % 2 == 0) output('even', input.a);
/*!*/ else output('odd', input.a);
/*!*/ });
let a = source();
/*!*/let n = N(); // --> create a new instance of the agent we defined above
/*!*/
/*!*/a.to(n.in('a'));
/*!*/n.out('even').subscribe(v => console.log('EVEN:: ' + v));
/*!*/n.out('odd').subscribe(v => console.log('ODD:: ' + v));
a.send(3);
a.send(4);
a.send(5);
In this example we have fetched each input and output pin of
n individually and connected
to it. You can of-course shorten this code using
.serialTo():
import { source, node, sink } from '@connectv/core';
let N = node({ inputs: ['a'], outputs: ['even', 'odd']}, // --> set the signature
(input, output) => {
if (input.a % 2 == 0) output('even', input.a);
else output('odd', input.a);
});
let a = source();
/*!*/a.to(N()).serialTo(
/*!*/ sink(v => console.log('EVEN:: ' + v)),
/*!*/ sink(v => console.log('ODD:: ' + v))
/*!*/).subscribe();
a.send(3);
a.send(4);
a.send(5);
Gate
You can control when some emission passes through using
gate()
/*!*/import { source, gate, map, spread, pipe, group, control } from '@connectv/core';
import { delay } from 'rxjs/operators';
let a = source();
/*!*/let g = gate();
/*!*/group(g.output, control()) // --> when the gate outputs something,
// ... `control()` here ensures one opening of
// ... the gate before the first output
.to(pipe(delay(1000))) // --> wait one second,
/*!*/.to(g.control); // --> open the gate
a.to(map(v => v.split(' '))) // --> get all the words
.to(spread()) // --> spread them
/*!*/.to(g) // --> pass it through the gate
.subscribe(console.log); // --> log it when it comes out
a.send("Hellow darkness my old friend I've come to talk with you again");
In this example,
g.control is a
control pin that will cause the
gate to let emissions through when it receives a value itself. This pin is not mentioned in the gate's
signature. Alongside gate, some other agent types (such as
node() and
expr())
also have this controlling pin (always accessible via
.control) which allows controlling
the behavior of the agent.
State
You can keep reactive state values within your flows using
state(). For example, the following code
introduces a composition that represents the state of a
html input, which is built on top of
state(),
and then uses that composition to bind two inputs together:
/*!*/import { wrap, map, state, sink, composition, pin } from '@connectv/core';
import { fromEvent } from 'rxjs';
function inputState(el: HTMLInputElement) {
return composition(track => {
let input = pin(), output = pin();
/*!*/ let model = state(); track(model); //--> track the state for later cleaning it up ...
/*!*/ input.to(model) //--> the input sets the state
/*!*/ .to(sink(v => el.value = v)) //--> the HTML element value is also set based on that
/*!*/ .to(output); //--> and it all goes to the output
wrap(fromEvent(el, 'input')) //--> when the user changes the input as well
.to(map(() => el.value)) //--> we will fetch the new value of the input
.to(model); //--> and update the state using it
return [{input}, {output}];
})();
}
let a = inputState(document.getElementById('a') as HTMLInputElement);
let b = inputState(document.getElementById('b') as HTMLInputElement);
/*!*/a.to(b); // --> literal two-way binding
/*!*/b.to(a);
a.bind(); // --> .bind() on composition will call .bind() on all child nodes having that method
b.bind();
Sequence
The
sequence() agent allows you to distinguish specific sequences of events
coming from various sources:
/*!*/import { wrap, map, sequence, group } from '@connectv/core';
import { fromEvent } from 'rxjs';
group(
wrap(fromEvent(document, 'mousedown')),
wrap(fromEvent(document, 'mousemove')),
wrap(fromEvent(document, 'mouseup'))
)
.serialTo(
/*!*/ sequence(1, 0, 1), //--> click is 1 mouse down, 0 mousemove, 1 mouse up
/*!*/ sequence(1, '+', 1) //--> drag is 1 mouse down, +1 mousemoves, 1 mouse up
)
.serialTo(
map(() => 'CLICK'),
map(() => 'DRAG')
).subscribe(console.log);
Memory Management
All pins and agents in
CONNECTIVE come with a
.clear() method that will dispose
all of their internal references. The
.clear() method of a
Composition will also
clear all of its child nodes.
WARNING: Calling
.clear() will make the pin or the agent completely unusable afterwards,
so you should call it only when you are done with the object.
let myPin = ...;
let myAgent = ...;
//
// do your stuff
//
myPin.clear();
myAgent.clear();
Additionally, every
.subscribe() method returns an instance of
RxJS's Subscription
, which is equiped with a
.unsubscribe() method. You should maintain the references
for subscribptions that you would want to dispose of later, and call their
.unsubscribe()
method.
let myPin = ...;
//
// do whatever
//
let sub = myPin.subscribe(...);
//
// some other stuff
//
sub.unsubscribe();
If you need to call your clearing logic when some other flow is finished/cleared, call it on
complete notification of your subscription to the flow:
mySource
.to(...)
.to(...)
.subscribe(
value => do_something_with_value(value),
undefined,
() => clear_everything_up()
);
You can find more details about memory management in
CONNECTIVE here.
Under the Hood
As stated above,
CONNECTIVE is just a thin layer on top of
RxJS. This means that
CONNECTIVE
constructs
RxJS observable sequences for you based on the description of the reactive flow you have provided.
More precisely, each
Pin is basically a lazy
Observable. When you connect it to other pins or connect pins
to it the
Observable is still not realized, and when the
Observable is accessed, for example through
.subscribe() method or through
.observable property, the
Observable will be
created and the
Pin will be locked.
Generally
CONNECTIVE masks the API of
RxJS so for simpler use cases you do not need to work with
RxJS directly, that's why knowledge of
RxJS is not mandatory to get started with
CONNECTIVE.
However, none of the functionality of
RxJS is masked just for masking it, so for more advanced usage
you would definitely need to be familiar with
RxJS.
You can read more in detail on how
CONNECTIVE creates observable sequences based on your flows
here.
CONNECTIVE v RxJS
An important thing to notice is that
CONNECTIVE is designed NOT as an alternative
to
RxJS. It is rather meant to act as a
complement to RxJS specifically in the following
ways:
- It allows description of a reactive flow in any order,
- It provides a different model of reactive flows,
- It provides standard abstractions for flow re-use.
This simply means that for smaller and simpler flows, the difference between the two is negligble. However the benefits of
CONNECTIVE become more and more pronounced over larger and more complex reactive flows.
You can read more about this
in this entry.
-
For bugs, issues and suggestions, the best way is to create issues or pull requests to
the repository.
-
For questions, feedback, etc. join the conversation on
Gitter.
-
You can also drop me an email anytime.