Fork / Join

In some cases you want to do multiple things in parallel to an incoming event/data, and then gather back the results and do something with it. For example, in your login flow, you need to check the database record on the login user, e.g. to see if they exist or their password matches, and also you would need to generate a token for them that they could use, i.e. a JWT token. For generating the token, you do not need the data from database, so you can do these two actions in parallel, leading to a flow like this:
  import { source, map, control } from '@connectv/core';


let loginReq = source();                              //--> emulates incoming requests
let generateToken = map(user => `token for ${user}`); //--> generate a token for each user
let authenticate = map((user, done) => {              //--> check if user is authenticated
/*!*/  if (user == 'john')                                 //--> if the user is john ...
/*!*/    setTimeout(() => done(`auth for ${user}`), 100);  //... it will take a bit to process it ...
/*!*/  else done(`auth for ${user}`);                      //... but not for others
});

loginReq
.to(generateToken, authenticate)                      //--> so check auth and generate token in parallel
.to(control())                                        //--> gather the results
.subscribe(console.log);                              //--> see what happened

loginReq.send('john');                                //--> first john sends a login request
loginReq.send('jack');                                //--> then jack does
If you run this code, you'll see that the output will look like this:
  > ["token for john", "auth for jack"]
> ["token for jack", "auth for john"]
What happens is:
  1. generateToken first emits "token for john" and then "token for jack"
  2. authenticate first emits "token for jack" and then "token for john", as it takes a bit longer for processing "john"
  3. control() just pairs the first emission of each pin and then the second emission of each pin, which results in the wrong pairing.


fork() allows you to stamp incoming emissions, so you can pair them later based on this stamp using join():
  /*!*/import { source, map, fork, join } from '@connectv/core';


let loginReq = source();                              //--> emulates incoming requests
let generateToken = map(user => `token for ${user}`); //--> generate a token for each user
let authenticate = map((user, done) => {              //--> check if user is authenticated
  if (user == 'john')                                 //--> if the user is john ...
    setTimeout(() => done(`auth for ${user}`), 100);  //... it will take a bit to process it ...
  else done(`auth for ${user}`);                      //... but not for others
});

loginReq
/*!*/.to(fork())                                           //--> stamp the request
.to(generateToken, authenticate);                     //--> so check auth and generate token in parallel

/*!*/let j = join('token', 'auth');                        //--> join() is an agent
/*!*/generateToken.to(j.in('token'));                      //--> generateToken goes to 'token' field
/*!*/authenticate.to(j.in('auth'));                        //--> authenticate goes to 'auth' field
/*!*/j.output.subscribe(console.log);

loginReq.send('john');                                //--> first john sends a login request
loginReq.send('jack');                                //--> then jack does
Now the result becomes:
  > { token: "token for jack", auth: "auth for jack"}
> { token: "token for john", auth: "auth for john" }
Note that while fork() is a pin, join() is an agent. Unlike control() or pack(), it will generate an object with given fields, and will have one input pin for each of those fields.

Chain forks

The stamp that fork() puts on emissions is actually a stack, so you can have multiple forks and joins simultaenously. Each fork() will push a new stamp onto this stack, and each join() will match emissions with equal values through-out the whole stack (not just the last stamp). When join() emits, it will also pop the last stamp.

If you want to join emissions without poping the stamp, for example for joining later on to emissions of the same fork, then simply use peekJoin() instead of join().

Further reading




Copied to Clipboard!