Filter

Filter blocks some emissions while letting some others true utilizng the provided function.
Similar to map(), this function can be a simple sync function, in which case it needs to take one argument:
  /*!*/import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';

let a = document.getElementById('a') as HTMLInputElement;

wrap(fromEvent(a, 'input'))       //--> get input events on the input
 .to(map(() => a.value))          //--> map it to input's value
/*!*/ .to(filter(x => x.length > 3))   //--> only allow long enough text
 .subscribe(console.log);         //--> log it
Or it can be an asynchronous one, in which case it needs to take a callback as its second argument:
  /*!*/import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';

let a = document.getElementById('a') as HTMLInputElement;

wrap(fromEvent(a, 'input'))             //--> get input events on the input
 .to(map(() => a.value))                //--> map it to input's value
/*!*/ .to(filter((x, done) => {              //--> only allow long enough text
/*!*/   setTimeout(() => done(x.length > 3),
/*!*/    10000 - x.length * 200);            //--> also wait a bit
/*!*/ }))
 .subscribe(console.log);               //--> log it


Error handling

filter() supports error handling exactly the same way that map() does, i.e. in sync filter functions you can just throw the errors:
  import { wrap, map, filter } from '@connectv/core';
import { fromEvent } from 'rxjs';

let a = document.getElementById('a') as HTMLInputElement;

wrap(fromEvent(a, 'input'))             //--> get input events on the input
 .to(map(() => a.value))                //--> map it to input's value
 .to(filter(x => {                      //--> only allow long enough text
/*!*/   if (x == 'end')
/*!*/     throw new Error('END!');           //--> throw error when 'end' is nigh
   return x.length > 3;
 }))
 .subscribe(
   console.log,                         //--> log values
/*!*/   (err) =>
/*!*/     console.log(err.original.message)  //--> ERROR!
 );
and in async filter functions, you have to catch the error and pass it to the error callback, which will be the third argument passed to the filter function:
  import { wrap, map, filter } from '@connectv/core';
import { fromEvent, throwError } from 'rxjs';

let a = document.getElementById('a') as HTMLInputElement;

/*!*/let $badObs = throwError('END IS NIGH!');


wrap(fromEvent(a, 'input'))             //--> get input events on the input
 .to(map(() => a.value))                //--> map it to input's value
/*!*/ .to(filter((x, done, err) => {         //--> only allow long enough text
/*!*/   $badObs.subscribe(
/*!*/     () => done(x.length > 3),
/*!*/     error => err(error),
/*!*/   )
/*!*/ }))
 .subscribe(
   console.log,                         //--> log values
/*!*/   (err) =>
/*!*/     console.log(err.original.message)  //--> ERROR!
 );


Function purity

Similar to map(), sync filter functions MUST be pure, i.e. they should return the same result for the same input. Non-pure sync functions will lead to unpredictable flow behavior, so if you have a non-pure function wrap it in an async filter function.

For more details on how function purity is handled by CONNECTIVE, you can read this entry.

Further reading




Copied to Clipboard!