Pin

pin() is a relay for incoming events/data.
Pin is the parent class of most of more elaborate types of pins in CONNECTIVE, representing any node in a reactive flow. Different pin types exhibit different behaviors, for example control() waits for all incoming connections before each emission, pack() waits for all incoming connections before the first emission and will combine the latest emission from each from that point on, filter() will only let specific emissions through, etc.

In this entry we will discuss the shared properties of most pin types. Note that since a Pin is a base class for other pin types, pin() is not used directly as often, though its direct use-cases are also here.

Connecting pins

You can connect pins together using their .from() and .to() methods:
/*!*/import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

/*!*/a.to(b);   //--> anything that comes through a will be passed to b
/*!*/c.from(a); //--> anything that comes through a will be passed to c as well
You can also pass multiple arguments to .to() and .from() methods:
/*!*/import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

/*!*/a.to(b, c); //--> anything going through a will be passed to b and c
.to() and .from() return the pin passed to them, so that you can easily chain them:
/*!*/import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

/*!*/console.log(a.to(b) == b); //> true
/*!*/
/*!*/a.to(b).to(c);  //--> a is connected to b, b is connected to c.
When you pass multiple pins to .to() or .from() methods, the return value is a group of pins, containing the pins you passed to the method:
/*!*/import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();
let d = pin();

/*!*/a.to(b, c).to(d);  //--> a is connected to b and c, b and c are connected to d.
You can start a sequence of connections with such a group, using group():
/*!*/import { pin, group } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

/*!*/group(b, c).from(a); //--> a is connected to b and c.
You can also use .serialTo() and .serialFrom() to connect groups of pins to each other serially:
import { pin, group } from '@connectv/core';

let a = pin(), b = pin(), c = pin(), d = pin();

// a -> c
// b -> d
/*!*/group(a, b).serialTo(c, d);
.serialTo() and .serialFrom() also return a group consisting of pins passed to them, similar to .to() and .from():
group(a, b).serialTo(c, d).to(e);


Subscribing and binding

Pins are always in one of two states: bound or unbound. You can bind a pin using its .subscribe() method:
  import { source, pin } from '@connectv/core';

let a = source();
let b = pin();

a.to(b);
/*!*/b.subscribe(console.log);

a.send('Hellow!');
a.send('World!');
or the .bind() method on a sink():
  import { source, sink } from '@connectv/core';

let a = source();
/*!*/let b = sink(console.log);

a.to(b);
/*!*/b.bind();

a.send('Hellow!');
a.send('World!');
Note that before a pin is bound, it will not receive incoming events/data:
  import { source, sink } from '@connectv/core';

let a = source();
let b = sink(console.log);

a.to(b);
/*!*/a.send('Hellow!'); //--> falling on deaf ears ...
/*!*/
/*!*/b.bind();
/*!*/a.send('World!');  //--> now this one gets heard.
You can check if a pin is bound or not using its .locked property:
  let a = somePin();

if (a.locked) {
  // then it is going to recieve stuff but you can no longer connect to it.
}
else {
  // it is not bound yet, so you can connect to it.
}
  


Binding chain

You do not need to manually bind all the pins in your flow. When one pin is bound, all the pins in the flow that the bound pin relies on are going to be bound as well:
  import { pin } from '@connectv/core';

let a = pin();
let b = pin();
let c = pin();

a.to(b, c);

/*!*/b.subscribe();         //--> so we will lock b
/*!*/console.log(a.locked); //--> a is locked as well
/*!*/console.log(c.locked); //--> but c is not.


Subscription

The .subscribe() method on pins returns an instance of RxJS's Subscription object. This object in turn has an .unsubscribe() method, which you can use to clean-up the subscriptions you make to your flows:
  import { source, pin } from '@connectv/core';

let a = source();
let b = pin();
let c = pin();

a.to(b, c);

/*!*/let sb = b.subscribe(v => console.log('B:: ' + v));
/*!*/let sc = c.subscribe(v => console.log('C:: ' + v));

a.send('hellow');   //--> picked up by both b and c

/*!*/sb.unsubscribe();   //--> b is out of the game ...
a.send('world');    //--> picked up by c only

/*!*/sc.unsubscribe();   //--> c is out of the game ...
a.send('well ...'); //--> falling on deaf ears


Special callbacks

.subscribe() method has the exact same signature as the corresponding method on RxJS's Observables. This means you can supply it with error and/or complete callbacks as well:
  import { source, map, pin } from '@connectv/core';

let a = source();

a.to(map(() => { throw new Error(); }))       //--> someone in the flow is going to throw an error
 .to(pin())
/*!*/ .subscribe(
/*!*/   () => {                                    //--> this is the usual callback
/*!*/     console.log('everything is peachy ...');
/*!*/   },
/*!*/   error => {                                 //--> this is the error callback
/*!*/     console.log('ERROR!');
/*!*/     console.log(error);
/*!*/   });

a.send();
  import { source, pin } from '@connectv/core';

let a = source();

a.to(pin())
/*!*/ .subscribe(
/*!*/   v => { console.log('GOT:: ' + v); },  //--> this is the usual callback
/*!*/   error => { console.log('ERROR!'); },  //--> this is the error callback
/*!*/   () => { console.log('COMPLETE!'); }   //--> this will be called when the flow is closed off
/*!*/ );

a.send(12);
a.send('YOLO!');
a.clear();


Clearing up

It is a good idea to clear up any pin using its .clear() method after you are done with it. This method will disconnect the pin from rest of the flow and also unsubscribe subscriptions made via the .subscribe() method.
  import { source, pin } from '@connectv/core';

let a = source();
let b = pin();

a.to(b);
b.subscribe(v => console.log(v));

a.send('hellow');                     //--> goes through

/*!*/b.clear();
a.send('world!');                     //--> does not go through

b.subscribe(v => console.log(v));
a.send('yo mate!');                   //--> still not going through
There is no guarantee that a pin will be usable after being cleared, so call it when you are absolutely done with it. Most of the time you should not need to call this method directly, as other mechanisms in CONNECTIVE such as Compositions for example take care of these stuff automatically.

NOTE: It is NOT guaranteed that .clear() will unsubscribe all the subscriptions to the pin, as subscriptions created via methods other than calling .subscribe() might go un-tracked. If you make such subscriptions, be sure to track them and clean them up when you are done.

Usage

Mostly you would use some other pin type in your reactive flows. However, you could use the basic relay function of pin() to OR multiple control signals:
/*!*/import { source, pin, gate, group } from '@connectv/core';

let a = source();
let b = source();
let c = source();

let g1 = gate();
let g2 = gate();

group(a, b).to(g1.control);           //--> g1 will let data through when a AND b emit
/*!*/group(a, b).to(pin()).to(g2.control); //--> g2 will let data through when a OR b emit

c.to(g1.input, g2.input);
g1.output.subscribe(v => console.log('G1:: ' + v));
g2.output.subscribe(v => console.log('G2:: ' + v));

c.send(1); a.send(true); b.send(true);
c.send(2); a.send(true);


Further reading




Copied to Clipboard!