@@ -587,7 +587,7 @@ function createContext(context, value, path?) {
587587 execution : {
588588 ...context . execution ,
589589 operatorId : context . execution . operatorId + 1 ,
590- path : path ? context . execution . path . concat ( path ) : context . execution . path ,
590+ path : path || context . execution . path ,
591591 } ,
592592 }
593593}
@@ -671,7 +671,7 @@ export function forEach<Input extends any[]>(
671671
672672 if ( ! evaluatingCount ) {
673673 stopDebugOperator ( lastContext )
674- next ( null , context )
674+ next ( null , lastContext )
675675 }
676676 }
677677 startDebugOperator ( 'forEach' , '' , context )
@@ -680,7 +680,7 @@ export function forEach<Input extends any[]>(
680680 lastContext = createContext (
681681 lastContext || context ,
682682 value ,
683- String ( index )
683+ context . execution . path . concat ( String ( index ) )
684684 )
685685 const nextWithPath = createNextPath ( evaluate )
686686 forEachItemOperator ( null , lastContext , nextWithPath )
@@ -692,6 +692,48 @@ export function forEach<Input extends any[]>(
692692 return instance
693693}
694694
695+ export function parallel < Input > (
696+ operators : Operator < Input , any > [ ]
697+ ) : Operator < Input , Input > {
698+ const instance = ( err , context , next ) => {
699+ if ( err ) next ( err )
700+ else {
701+ let evaluatingCount = operators . length
702+ let lastContext
703+ let hasErrored = false
704+ const evaluate = ( err ) => {
705+ if ( hasErrored ) {
706+ return
707+ }
708+ if ( err ) {
709+ hasErrored = true
710+ return next ( err , lastContext )
711+ }
712+ evaluatingCount --
713+
714+ if ( ! evaluatingCount ) {
715+ stopDebugOperator ( lastContext )
716+ next ( null , lastContext )
717+ }
718+ }
719+ startDebugOperator ( 'parallel' , '' , context )
720+
721+ operators . forEach ( ( operator , index ) => {
722+ lastContext = createContext (
723+ lastContext || context ,
724+ context . value ,
725+ context . execution . path . concat ( String ( index ) )
726+ )
727+ const nextWithPath = createNextPath ( evaluate )
728+ operator ( null , lastContext , nextWithPath )
729+ } )
730+ }
731+ }
732+ instance [ IS_PIPE ] = true
733+
734+ return instance
735+ }
736+
695737export function filter < Input > (
696738 operation : ( input : Context < Input > ) => boolean
697739) : Operator < Input , Input > {
@@ -751,7 +793,11 @@ export function fork<
751793 else {
752794 startDebugOperator ( 'fork' , operation , context )
753795 const path = operation ( context )
754- const newContext = createContext ( context , context . value , path )
796+ const newContext = createContext (
797+ context ,
798+ context . value ,
799+ context . execution . path . concat ( path )
800+ )
755801 const nextWithPaths = createNextPath ( ( err , returnedContext ) => {
756802 if ( err ) next ( err )
757803 else {
@@ -779,12 +825,20 @@ export function when<Input, OutputA, OutputB>(
779825 else {
780826 startDebugOperator ( 'when' , operation , context )
781827 if ( operation ( context ) ) {
782- const newContext = createContext ( context , context . value , 'true' )
828+ const newContext = createContext (
829+ context ,
830+ context . value ,
831+ context . execution . path . concat ( 'true' )
832+ )
783833 const nextWithPath = createNextPath ( next )
784834 stopDebugOperator ( newContext )
785835 paths . true ( null , newContext , nextWithPath )
786836 } else {
787- const newContext = createContext ( context , context . value )
837+ const newContext = createContext (
838+ context ,
839+ context . value ,
840+ context . execution . path . concat ( 'false' )
841+ )
788842 const nextWithPath = createNextPath ( next )
789843 stopDebugOperator ( newContext )
790844 paths . false ( null , newContext , nextWithPath )
0 commit comments