Pipe

pipe() allows you to use any of RxJS's pipeable operators in your reactive flows:
  /*!*/import { wrap, pipe, map } from '@connectv/core';
import { fromEvent } from 'rxjs';
/*!*/import { throttleTime } from 'rxjs/operators';

let a = document.getElementById('a') as HTMLInputElement;
let p = document.getElementById('p');

wrap(fromEvent(a, 'input'))
.to(map(() => a.value))             //--> get the input value
/*!*/.to(pipe(throttleTime(1000)))       //--> throttle a bit
.subscribe(v => p.innerHTML = v);

Emissions

The operators are not passed raw emitted data/events, but rather Emission objects that contain them:
  /*!*/import { value, spread, pipe } from '@connectv/core';
/*!*/import { tap } from 'rxjs/operators';

value([1, 2, 3, 4])
.to(spread())
.subscribe(console.log);      //--> values are logged

value([1, 2, 3, 4])
.to(spread())
/*!*/.to(pipe(tap(console.log)))   //--> `Emission` objects are logged
.subscribe();
In a lot of cases you don't need to work with incoming emissions directly. In cases that you do, you can access the actual value of the emission using its .value property:
  import { value, spread, pipe } from '@connectv/core';
import { timer } from 'rxjs';
/*!*/import { delayWhen } from 'rxjs/operators';

value([1, 2, 3, 4])
.to(spread())
/*!*/.to(pipe(delayWhen(e => timer(1000 - e.value * 10)))) //--> delay proportional to inverse of the value
/*!*/                                                      //... so that the array is reversed
.subscribe(console.log);
The operators should also return observables that will emit emissions (Observable<Emission>). You can use .fork() method on an incoming emission to create a new one with a new value, or Emission.from() function to create an emission from multiple incoming emissions.

Further reading




Copied to Clipboard!