CONNECTIVE facilitates large-scale reactive programming in Javascript/Typescript. It enables declarative creation of large and complex data/event flows and encourages re-use of flows.
npm i @connectv/core


A Hellow World! example:
/*!*/import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';

let a = document.getElementById('a') as HTMLInputElement;
let p = document.getElementById('p');

//
// Will say hello to everyone but 'Donald'.
// For obvious reasons.
//

/*!*/wrap(fromEvent(a, 'input'))           // --> wrap the `Observable` in a `Pin`
/*!*/.to(map(() => a.value))               // --> map the event to value of the input
/*!*/.to(filter(name => name != 'Donald')) // --> filter 'Donald' out
/*!*/.to(map(name => 'hellow ' + name))    // --> add 'hellow' to the name
/*!*/.subscribe(msg => p.innerHTML = msg); // --> write it to the <p> element
A more elaborate example:
/*!*/import { wrap, pipe, map, filter, sink } from '@connectv/core';
import { fromEvent, timer } from 'rxjs';
import { delay, debounceTime } from 'rxjs/operators';

let a = document.getElementById('a');
let p = document.getElementById('p');

//
// Will calculate fibonacci sequence up to given index, displaying every number in the
// sequence along the way.
//

// --> calculate next iteration step on fibonacci sequence
/*!*/let m = map(([next, prev, l]) => [next + prev, next, l - 1]);

/*!*/wrap(fromEvent(a, 'input'))                    // --> wrap the `Observable` in a `Pin`
/*!*/.to(pipe(debounceTime(1000)))                // --> wait for people to type in the number
/*!*/.to(map(() => parseInt((a as any).value)))   // --> map the input event to value of the input
/*!*/.to(map(n => [1, 0, n]))                     // --> map the number to start iteration
/*!*/.to(filter(([_, __, l]) => l >= 0))          // --> check if we should do any iteration
/*!*/.to(m)                                       // --> calculate next step
/*!*/.to(pipe(delay(300)))                        // --> take a breath
/*!*/.to(filter(([_, __, l]) => l > 0))           // --> check if we should continue
/*!*/.to(m)                                       // --> back to the loop
/*!*/.to(map(([_, f, __]) => f))                  // --> btw, lets take each number in the sequence
/*!*/.to(sink(v => p.innerHTML = v))              // --> set the text of <p> to the fib number
/*!*/.subscribe();                                // --> bind the whole thing.

CONNECTIVE is a thin layer on top of RxJS. While RxJS's API excels at short-lived and small reactive flows, CONNECTIVE provides an API that makes creating long-living and large and/or complex reactive flows easy and intuitive.

How To Install

Using NPM:
npm i @connectv/core

Getting it via a CDN:
<!-- Click on each line to copy it -->

<!-- Dependencies -->
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/lodash@4.17.14/lodash.min.js"></script>

<script src="https://unpkg.com/@connectv/core/dist/bundles/connective.es5.min.js"></script>


How To Use

Read the docs for properly utilizing CONNECTIVE. A basic knowledge of RxJS would also help, though it is not mandatory. For basic usage and/or for getting started, here is a quick-dive:

Quick Dive

CONNECTIVE is about creating reactive flows. A reactive flow typically starts with some source of data/events, leading to some sinks for those data/events which would consume them, possibly going through and being transformed by some other nodes along the way:
/*!*/import { source, sink, pin } from '@connectv/core';

let a = source();
let b = pin();
let c = sink(value => console.log(value));

/*!*/a.to(b).to(c); // --> data/events go from a to b then to c.

c.bind();

a.send('hellow!');
a.send('world!');
/*!*/import { source, sink, pin } from '@connectv/core';

let a = source();
let b = pin();
let c = sink(v => console.log(v));

/*!*/c.from(b).from(a); // --> data/events that c receives come from b which in turn come from a.

c.bind();

a.send('hellow!');
a.send('world!');
Every node of such a flow is represented by a Pin (or some PinLike object). The pin(), source() and sink() functions all return a Pin.

You can connect pins to each other using their .to() and .from() methods. You can pass multiple parameters to .to() and .from(), causing the Pin to be connected to multiple pins:
import { source, sink } from '@connectv/core';

let a = source();
let b = sink(value => console.log('B::' + value));
let c = sink(value => console.log('C::' + value));

/*!*/a.to(b, c); // --> stuff from a goes to both b and c
b.bind(); c.bind();

a.send('hellow!');
a.send('world!');
import { source, sink } from '@connectv/core';

let a = source();
let b = source();
let c = sink(value => console.log(value));

/*!*/c.from(a, b); c.bind(); // --> c's stuff come from both a and b

a.send('hellow!');
b.send('world!');

Serial connections

You can also connect series of pins to each other using .serialTo() and .serialFrom() methods:
/*!*/import { source, sink, group } from '@connectv/core';

let a = source();
let b = source();
let c = sink(value => console.log('C::' + value));
let d = sink(value => console.log('D::' + value));

/*!*/group(a, b).serialTo(c, d);
/*!*/group(c, d).bind();

a.send('hellow!');
b.send('world!');
This means a is connected to c and b is connected to d.

Also note how you can call .bind() on both c and d simultaenously using group().

Subscribing

You can directly subscribe on any Pin using its .subscribe() method, so you don't need to use sink() all the time. You can also use group() to work with a multitude of Pins at once:
import { source, pin, group } from '@connectv/core';

let a = source();
let b = source();

/*!*/group(a, b).to(pin()).subscribe(v => console.log(v));

a.send('hellow!');
b.send('world!');
Note that:
import { pin } from '@connectv/core';

let a = pin();
let b = pin();
a.to(b);

console.log('A:: ' + a.locked + ' B:: ' + b.locked);
// > 'A:: false B:: false'

b.subscribe();
console.log('A:: ' + a.locked + ' B:: ' + b.locked);
// > 'A:: true B:: true'

Sources

You can also create sources from plain values or from Observables:
/*!*/import { wrap, value, pin, group } from '@connectv/core';
import { interval } from 'rxjs';

/*!*/let a = value('hellow world!'); // --> a emits 'hellow world!' once
/*!*/let b = wrap(interval(1000));   // --> b emits a number every 1000ms

group(a, b).to(pin()).subscribe(v => console.log(v));
Note that you cannot connect any pin to wrap().

Spread

You can use spread() to turn incoming values into multiple emissions:
/*!*/import { spread, value } from '@connectv/core';

value([1, 2, 3, 4, 5, 6])
/*!*/.to(spread())
.subscribe(v => console.log(v));

Filter & Map

You can change incoming values using map(), and let them through or block them using filter():
/*!*/import { spread, value, filter, map } from '@connectv/core';

value([1, 2, 3, 4, 5, 6])
.to(spread())
/*!*/.to(filter(v => v % 2 == 0)) // --> only allow even ones through
/*!*/.to(map(v => v * 10))        // --> multiply each value by 10
.subscribe(v => console.log(v));

Control

You can break your flow into multiple branches and join them back using control():
/*!*/import { spread, value, map, control } from '@connectv/core';

value([1, 2, 3, 4, 5, 6])
.to(spread())
.to(
  map(v => v * 10),
  map(v => v * 100),
)
/*!*/.to(control()) // --> will wait for all incoming emissions and join them pair-wise
.subscribe(v => console.log(v));

Loops

You can even create loops in your flow:
/*!*/import { source, map, filter } from '@connectv/core';

let a = source();
/*!*/let m = map(x => x + 1); // --> lets give this pin a name so we can loop back to it

a.to(m)
/*!*/.to(filter(x => x < 10)) // --> keep looping until we reach 10
/*!*/.to(m)                   // --> looping back to m
.subscribe(v => console.log(v));

a.send(0);

Pipe

In the example above, the order is not necessarily preserved. You can for example enforce the order by using RxJS's delay() operator:
/*!*/import { source, map, filter, pipe } from '@connectv/core';
import { delay } from 'rxjs';

let a = source();
let m = map(x => x + 1); // --> lets give this pin a name so we can loop back to it

a.to(m)
.to(filter(x => x < 10)) // --> keep looping until we reach 10
/*!*/.to(pipe(delay(1)))      // --> wait a bit
.to(m)                   // --> looping back to m
.subscribe(v => console.log(v));

a.send(0);
pipe() allows you to use any RxJS pipeable operator. Note that values are not passed directly to pipeable operators, they are wrapped in Emission objects, and their return value should also be of that type.

Composition

You can re-use parts of your reactive flows using composition():
/*!*/import { composition, source, pin, filter, sink } from '@connectv/core';

/*!*/const evenOdd = composition(() => {             //--> define the composition
/*!*/  let input = pin();                            //--> it has one input pin
/*!*/  let odd = input.to(filter(x => x % 2 == 1));  
/*!*/  let even = input.to(filter(x => x % 2 == 0));
/*!*/
/*!*/  return [{ input }, { odd, even }];            //--> return the inputs and outputs in the end
/*!*/});

//
// now using the composition:
//

let a = source();
/*!*/a.to(evenOdd()).serialTo(                      //--> `serialTo()` here connects ...
  sink(v => console.log('ODD:: ' + v)),        //... the first output of `evenOdd()` to this sink ...
  sink(v => console.log('EVEN:: ' + v))        //... and the second output to this sink.
).subscribe();

a.send(2);
a.send(3);
a.send(4);
The composition is just a part of your reactive flow, with some pins going into it (i.e. inputs) and some pins coming out of it (i.e. outputs). These two groups of pins are marked in the composition definition:
  return [{ input }, { odd, even }];
Which means input is the input pin of this composition and odd and even are the output pins.

In the example above, a.to(evenOdd()) connects a to all of the input pins of evenOdd() (which is one pin, input), and the serialTo() connects the outputs (odd and even) correspondingly to provided sinks. You can access these input and output pins individually as well:
let e = evenOdd();

// --> so basically these are all equivalent:

/*!*/a.to(e.in('input'));
/*!*/e.out('even').to(sink(v => console.log('ODD:: ' + v))).subscribe();
/*!*/e.out('odd').to(sink(v => console.log('EVEN:: ' + v))).subscribe();

// ... using group:

/*!*/a.to(e.in('input'));
/*!*/group(e.out('odd'), e.out('even'))
/*!*/.serialTo(
/*!*/  sink(v => console.log('ODD:: ' + v)),
/*!*/  sink(v => console.log('EVEN:: ' + v))
/*!*/).subscribe();

// ... implicit connections:

/*!*/a.to(e).serialTo(
/*!*/  sink(v => console.log('ODD:: ' + v)),
/*!*/  sink(v => console.log('EVEN:: ' + v))
/*!*/).subscribe();


Agent

Compositions are sub-classes of Agent class. This base class represents all re-usable reactive flows. CONNECTIVE comes with a number of useful agents that you can use in creating your own flows and compositions.

The main property of an Agent is its Signature. A signature is simply an object with a list of strings named inputs and a list of strings named outputs. These two lists denote the possible names of the incoming and outgoing pins of the Agent.

Expr

expr() allows you to convert any function into an agent:
/*!*/import { source, expr, group } from '@connectv/core';

let a = source();
let b = source();

group(a, b)
/*!*/.serialTo(expr((a, b) => `${a} + ${b} = ${a + b}`)) //--> a is connected to the first input of the expr,
                                                    //... b to the second
.subscribe(console.log);                            //... and the output is subscribed to.

a.send(2); b.send(3);
a.send(42); b.send(30);
Notice how the expr is called for every new value provided by the sources after the two initial values.

In the example above, we have again used .serialTo() to implicitly connect to inputs of the expr(), but you could retrieve them individually as well:
let e = expr((a, b) => a + b);

group(a, b).serialTo(e.in(0), e.in(1));
e.result.subscribe(console.log);        // --> this is short-hand for `e.out('result')`


Node

A Node is basically a more complex version of an Expr, allowing you to emit to any outputs denoted in the signature (instead of just "result" pin). You can create nodes easily using node() function:
/*!*/import { source, node } from '@connectv/core';

/*!*/let N = node({ inputs: ['a'], outputs: ['even', 'odd']}, // --> set the signature
/*!*/  (input, output) => {
/*!*/    if (input.a % 2 == 0) output('even', input.a);
/*!*/    else output('odd', input.a);
/*!*/  });


let a = source();
/*!*/let n = N(); // --> create a new instance of the agent we defined above
/*!*/
/*!*/a.to(n.in('a'));
/*!*/n.out('even').subscribe(v => console.log('EVEN:: ' + v));
/*!*/n.out('odd').subscribe(v => console.log('ODD:: ' + v));

a.send(3);
a.send(4);
a.send(5);
In this example we have fetched each input and output pin of n individually and connected to it. You can of-course shorten this code using .serialTo():
import { source, node, sink } from '@connectv/core';

let N = node({ inputs: ['a'], outputs: ['even', 'odd']}, // --> set the signature
  (input, output) => {
    if (input.a % 2 == 0) output('even', input.a);
    else output('odd', input.a);
  });


let a = source();
/*!*/a.to(N()).serialTo(
/*!*/  sink(v => console.log('EVEN:: ' + v)),
/*!*/  sink(v => console.log('ODD:: ' + v))
/*!*/).subscribe();

a.send(3);
a.send(4);
a.send(5);

Gate

You can control when some emission passes through using gate()
/*!*/import { source, gate, map, spread, pipe, group, control } from '@connectv/core';
import { delay } from 'rxjs/operators';

let a = source();
/*!*/let g = gate();

/*!*/group(g.output, control())   // --> when the gate outputs something,
                             // ... `control()` here ensures one opening of 
                             // ... the gate before the first output
.to(pipe(delay(1000)))       // --> wait one second,
/*!*/.to(g.control);              // --> open the gate

a.to(map(v => v.split(' '))) // --> get all the words
.to(spread())                // --> spread them
/*!*/.to(g)                       // --> pass it through the gate
.subscribe(console.log);     // --> log it when it comes out

a.send("Hellow darkness my old friend I've come to talk with you again");
In this example, g.control is a control pin that will cause the gate to let emissions through when it receives a value itself. This pin is not mentioned in the gate's signature. Alongside gate, some other agent types (such as node() and expr()) also have this controlling pin (always accessible via .control) which allows controlling the behavior of the agent.

State

You can keep reactive state values within your flows using state(). For example, the following code introduces a composition that represents the state of a html input, which is built on top of state(), and then uses that composition to bind two inputs together:
/*!*/import { wrap, map, state, sink, composition, pin } from '@connectv/core';
import { fromEvent } from 'rxjs';

function inputState(el: HTMLInputElement) {
  return composition(track => {
    let input = pin(), output = pin();
/*!*/    let model = state(); track(model);   //--> track the state for later cleaning it up ...

/*!*/    input.to(model)                      //--> the input sets the state
/*!*/      .to(sink(v => el.value = v))       //--> the HTML element value is also set based on that
/*!*/      .to(output);                       //--> and it all goes to the output

    wrap(fromEvent(el, 'input'))         //--> when the user changes the input as well
      .to(map(() => el.value))           //--> we will fetch the new value of the input
      .to(model);                        //--> and update the state using it

    return [{input}, {output}];
  })();
}


let a = inputState(document.getElementById('a') as HTMLInputElement);
let b = inputState(document.getElementById('b') as HTMLInputElement);

/*!*/a.to(b);  // --> literal two-way binding
/*!*/b.to(a);

a.bind(); // --> .bind() on composition will call .bind() on all child nodes having that method
b.bind();

Sequence

The sequence() agent allows you to distinguish specific sequences of events coming from various sources:
/*!*/import { wrap, map, sequence, group } from '@connectv/core';
import { fromEvent } from 'rxjs';

group(
  wrap(fromEvent(document, 'mousedown')),
  wrap(fromEvent(document, 'mousemove')),
  wrap(fromEvent(document, 'mouseup'))
)
.serialTo(
/*!*/  sequence(1, 0, 1),   //--> click is 1 mouse down, 0 mousemove, 1 mouse up
/*!*/  sequence(1, '+', 1)  //--> drag is 1 mouse down, +1 mousemoves, 1 mouse up
)
.serialTo(
  map(() => 'CLICK'),
  map(() => 'DRAG')
).subscribe(console.log);

Memory Management

All pins and agents in CONNECTIVE come with a .clear() method that will dispose all of their internal references. The .clear() method of a Composition will also clear all of its child nodes.

WARNING: Calling .clear() will make the pin or the agent completely unusable afterwards, so you should call it only when you are done with the object.
let myPin = ...;
let myAgent = ...;

//
// do your stuff
//

myPin.clear();
myAgent.clear();
Additionally, every .subscribe() method returns an instance of RxJS's Subscription , which is equiped with a .unsubscribe() method. You should maintain the references for subscribptions that you would want to dispose of later, and call their .unsubscribe() method.
let myPin = ...;
//
// do whatever
//

let sub = myPin.subscribe(...);

//
// some other stuff
//

sub.unsubscribe();
If you need to call your clearing logic when some other flow is finished/cleared, call it on complete notification of your subscription to the flow:
mySource
.to(...)
.to(...)
.subscribe(
  value => do_something_with_value(value),
  undefined,
  () => clear_everything_up()
);
You can find more details about memory management in CONNECTIVE here.



Under the Hood

As stated above, CONNECTIVE is just a thin layer on top of RxJS. This means that CONNECTIVE constructs RxJS observable sequences for you based on the description of the reactive flow you have provided.

More precisely, each Pin is basically a lazy Observable. When you connect it to other pins or connect pins to it the Observable is still not realized, and when the Observable is accessed, for example through .subscribe() method or through .observable property, the Observable will be created and the Pin will be locked.

Generally CONNECTIVE masks the API of RxJS so for simpler use cases you do not need to work with RxJS directly, that's why knowledge of RxJS is not mandatory to get started with CONNECTIVE. However, none of the functionality of RxJS is masked just for masking it, so for more advanced usage you would definitely need to be familiar with RxJS.

You can read more in detail on how CONNECTIVE creates observable sequences based on your flows here.



CONNECTIVE v RxJS

An important thing to notice is that CONNECTIVE is designed NOT as an alternative to RxJS. It is rather meant to act as a complement to RxJS specifically in the following ways:

This simply means that for smaller and simpler flows, the difference between the two is negligble. However the benefits of CONNECTIVE become more and more pronounced over larger and more complex reactive flows. You can read more about this in this entry.



Contact



Copied to Clipboard!