Emission and context

Events and data DO NOT move around in their raw form within reactive flows created with CONNECTIVE. Instead, they are wrapped with Emission objects. This allows for adding context to each emission, which for example is used by fork() and join() to track each emission through-out the flow.
/*!*/import { source, map, emission, Emission } from '@connectv/core';

let a = source();
a.to(map(x => x * 2))
/*!*/.observable.subscribe((e: Emission) => {      //--> subscribe to `.observable`
  console.log('RECEIVED EMISSION:: ');        //... to receive the emission
/*!*/  console.log('- value : ' + e.value);        //... instead of just raw value
/*!*/  console.log('- index: ' + e.context.index); //... which also has a context
});

/*!*/a.emit(emission(2, { index: 0 }));   //--> use `emit()` instead of `send()`
/*!*/a.emit(emission(42, { index: 1 }));  //... to specify context.

Context

You can provide the context for each emission using source()'s .emit() method instead of its .send() method:
let s = source();

s.send(42);                                                   //--> no context provided
/*!*/s.emit(emission(42, { thisIs: 'the answer to everything' })); //--> proper context propvided.


You can access the context of the emission (NOT the emission object itself) in various pins and agents, as listed below. You can also modify the context wherever you can access it, as the context is preserved by reference per emission.

Inside map()/filter()

You can access the context inside an async func of a map() or an async func of a filter(), as the fourth parameter:
/*!*/import { source, map, emission, Emission } from '@connectv/core';

let a = source();
/*!*/a.to(map((x, done, err, ctx) => {  //--> access the context via `ctx`         
/*!*/  ctx.index = ctx.index + 1;       //--> shift the index by 1
  done(x * 2);
}))
.observable.subscribe((e: Emission) => {
  console.log('RECEIVED EMISSION:: ');
  console.log('- value : ' + e.value);
  console.log('- index: ' + e.context.index);
});

a.emit(emission(2, { index: 0 }));
a.emit(emission(42, { index: 1 }));

Inside sink()

You can access the context inside a sink() via the second parameter passed to its callback:
import { source, map, sink, emission } from '@connectv/core';

let a = source();
a
.to(map(x => x * 2))
/*!*/.to(sink((v, ctx) => {
/*!*/  console.log(ctx.index + ' :: ' + v);
}))
.subscribe();

a.emit(emission(2, { index: 0 }));
a.emit(emission(42, { index: 1 }));

Inside expr()

Within an expr(), the context is accessible via the rest parameters of your function:
import { source, expr, emission } from '@connectv/core';

let a = source();

/*!*/let e = expr((x, ...[ctx]) => {  //--> get the context from rest params
/*!*/  console.log(ctx.index);
  return x * 2;
});

a.to(e.in(0));
e.result.subscribe(console.log);

a.emit(emission(2, { index: 0 }));
a.emit(emission(42, { index: 1 }));

Inside node()

The context is accessible within node() on the fourth parameter provided to the node function:
import { source, node, emission } from '@connectv/core';

let N = node(
  { inputs: ['i'], outputs: ['o'] },
/*!*/  (inputs, output, err, context) => {
/*!*/    console.log('index:: ' + context.index);
    output('o', inputs.i * 2);
  }
);

let a = source();
let n = N();
a.to(n.in('i'));
n.out('o').subscribe(console.log);

a.emit(emission(2, { index: 0 }));
a.emit(emission(42, { index: 1 }));

Accessing emissions

The simplest way for accessing an emission is to subscribe to the underlying Observable of a Pin instead of subscribing to the pin itself:
let p = pin();

...

p.subscribe(v => ...);           //--> `v` is raw value
/*!*/p.obesrvable.subscribe(e => ...) //--> `e` is emission object
You can then read the emission's value using its .value property and its context using its .context property:
let p = pin();

...

p.observable.subscribe(e => {
/*!*/  let value = e.value;            //--> the value of the emission
/*!*/  let ctx = e.context;            //--> the context of the emission
});


Pipes

RxJS's pipeable operators combined with pipe() pin will receive Emission observables and are expected to return emission observables (Observable<Emission>):
import { value, spread, pipe } from '@connectv/core';
import { timer } from 'rxjs';
/*!*/import { delayWhen } from 'rxjs/operators';

value([1, 2, 3, 4])
.to(spread())
/*!*/.to(pipe(delayWhen(e => timer(1000 - e.value * 10)))) //--> delay proportional to inverse of the value 
/*!*/                                                      //... so that the array is reversed
.subscribe(console.log);
You can read more about this here.

Modifying emissions

The value of each emission is constant. If you want to modify the value of the emission through-out its life-cylce, you can use its .fork() method, which will create a new emission with the same context and new value:
import { emission } from '@connectv/core';

let e = emission(42, { thisIs: "the ultimate answer"});

/*!*/let e2 = e.fork(21); //--> HERESEY!!
console.log(e2);

Merging emissions

In many cases multiple emissions come from different sources and are merged into one new emission. If you need to merge emissions on your self, you can use the static function Emission.from():
import { emission, Emission } from '@connectv/core';

let e1 = emission(42, { someKey: 'some value' });
let e2 = emission(31, { someOtherKey: 'some other value' });

/*!*/let merged = Emission.from([e1, e2]);
console.log(merged.value);
console.log(merged.context);
When merged, the context of the emission's will be merged. If a key is shared in multiple emission contexts with different values (by reference), the merged emission will replace the value with an instance of MergedEmissionContextVal, which contains all of the merged values:
/*!*/import { emission, Emission, MergedEmissionContextVal } from '@connectv/core';

let e1 = emission(42, { a: 2, b: 3, c: 5 });  //--> a is repeated with the same value,
let e2 = emission(31, { a: 2, b: 4, d : 6 }); //... b is repeated with different values.

let merged = Emission.from([e1, e2]);
console.log(merged.value);
console.log(merged.context);
/*!*/console.log(merged.context.b instanceof MergedEmissionContextVal);
/*!*/console.log(merged.context.b.values); //--> the merged values for b


Further reading




Copied to Clipboard!