MyFluxTest.java:
java">import java.util.ArrayList;
import java.util.List;public class MyFluxTest {public static void main(String[] args) {List<String> names = new ArrayList<>();names.add( "张三" );names.add( "李四" );names.add( "王五" );MyFlux flux_name = MyFlux.fromList(names);MyFlux flux_name1 = flux_name.map(name -> {return name + " 111 ";});MyFlux flux_name2 = flux_name1.map(name -> {return name + " 222 ";});MyFlux flux_name3 = flux_name2.map(name -> {return name + " 333 ";});flux_name3.subscribe( name->{System.out.println( name );} );}
}
MyConsumer.java:
java">public interface MyConsumer {void consume( String element );
}
MyFlux.java:
java">import lombok.Getter;
import lombok.Setter;import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;@Getter
@Setter
public class MyFlux implements Serializable {// 只有初始的那个 flux 的 innerList 才有值,即 prevFlux == null 的那个 fluxprivate List<String> innerList;private MyFlux prevFlux;private MyFlux nextFlux;private MyFunction function; // 是哪个 flux 调用的 map,则传递的 function 就是 谁的 functionprivate MyConsumer consumer;public static MyFlux fromList( List<String> list ) {MyFlux flux_new = new MyFlux();flux_new.innerList = new ArrayList<>();flux_new.setPrevFlux( null );flux_new.setNextFlux( null );for( String element:list ){flux_new.innerList.add( element );}return flux_new;}public MyFlux map(MyFunction function) {if( function == null ){return null;}MyFlux flux_new = new MyFlux();flux_new.setPrevFlux( this );flux_new.setNextFlux( null );flux_new.setFunction( function );this.setNextFlux( flux_new );return flux_new;}public void subscribe(MyConsumer consumer) {List<MyFlux> fluxList = new ArrayList<>();List<String> initInnerList = this.addMeToFluxListAndReturnInitInnerList( fluxList );int size = fluxList.size();int elementCount = initInnerList.size();for (int j = 0; j < elementCount; j++) {String element = initInnerList.get( j );for (int i = size -1; i >=0 ; i--) {MyFlux flux = fluxList.get(i);if( flux.function == null ){continue;}element = flux.function.apply(element);}consumer.consume( element );}}private List<String> addMeToFluxListAndReturnInitInnerList(List<MyFlux> fluxList) {fluxList.add( this );if( this.prevFlux == null ){return this.innerList;}return this.prevFlux.addMeToFluxListAndReturnInitInnerList( fluxList );}
}
MyFunction.java:
java">public interface MyFunction {String apply( String element );
}