Invoke

Use invoke() when you need a fresh instance of a sub-flow to properly handle each incoming set of inputs.

For example, imagine the following composition:
  import { composition, map, pipe, spread, reduce, pin, handleError } from '@connectv/core';
import { ajax } from 'rxjs/ajax';
import { debounceTime } from 'rxjs/operators';

//--> this composition will calculate all of stargazers
//--> all the public repos of a user have on github.
export const stargazers = composition(() => {
  let username = pin();

  let handler = handleError();

  let stars = username
  .to(map((username, done, error) => {
    ajax.getJSON(
      `https://api.github.com/users/${username}/repos` //--> GitHub API!
    ).subscribe(done, error);                          //--> pass on the callbacks
  }))
  .to(handler)                                         //--> let errors be handled
  .to(spread())                                        //--> spread individual repos
  .to(map(repo => repo.stargazers_count))              //--> get the stargazers of the repo
/*!*/  .to(reduce((total, stars) => total + stars))         //--> aggregate everything
  .to(pipe(debounceTime(1)))                           //--> wait till aggregation is done

  return [{ username }, { stars, notFound: handler.error }];
});
Using this composition usually looks like this:
  import { wrap, map, pipe, filter, sink } from '@connectv/core';
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

import { stargazers } from './stargazers';

let i = document.getElementById('i') as HTMLInputElement;
let p = document.getElementById('p');

wrap(fromEvent(i, 'input'))
.to(map(() => i.value))
.to(filter(v => v))
.to(pipe(debounceTime(1000)))
//
// PROBLEM:: we need a fresh instance of Stargazers so that we don't
// get the aggregate of all users we tried ...
//
/*!*/.to(stargazers())
.serialTo(
  sink(v => p.textContent = v),
  sink(() => p.textContent = 'NOT FOUND')
)
.subscribe();
Now if you enter multiple usernames, you will see that the output you are getting is the aggregate of the total number of stars for ALL of the entered users. This is because we have one instance of stargazers() in our flow, so it will aggregate the stars of all incoming users.

To fix this, you can use invoke() to create a new instance of stargazers() per username, hence only getting the aggregate number of stars for that user:
  /*!*/import { wrap, map, pipe, filter, invoke, sink } from '@connectv/core';
import { fromEvent } from 'rxjs';
import { debounceTime } from 'rxjs/operators';

import { stargazers } from './stargazers';

let i = document.getElementById('i') as HTMLInputElement;
let p = document.getElementById('p');

wrap(fromEvent(i, 'input'))
.to(map(() => i.value))
.to(filter(v => v))
.to(pipe(debounceTime(1000)))
//
// PROBLEM FIXED: invoke() will create a new instance of
// Stargazers() for each username.
//
/*!*/.to(invoke(stargazers))
.serialTo(
  sink(v => p.textContent = v),
  sink(() => p.textContent = 'NOT FOUND')
)
.subscribe();

Behavior

invoke() behaves just like a node():

Signature

invoke() will have the same signature as the agent that the factory function returns. You can override this by providing a signature as the second argument.

Construction purity

invoke() will call your factory function at least once before any execution, to collect the signature. This means that your factory function MUST be pure, i.e. resulting in the same thing every time it is executed, and not changing the state of the overall program. If not, this might lead to unpredictable flow behavior.

This also means that your agent's construction process also MUST be pure. If, for any reason, you have an agent whose construction is not pure, or your factory function cannot be pure, provide the signature explicitly.

Further reading




Copied to Clipboard!