首页 > 科技 > 反应式微服务框架Flower

反应式微服务框架Flower

Flower是一个构建在Akka上的反应式微服务框架,开发者只需要针对每一个细粒度的业务功能开发一个Service服务,并将这些Service按照业务流程进行可视化编排,即可得到一个反应式系统。

  • 即时响应:服务流程的调用者可以得到即时响应,无需等待整个Service流程执行完毕;Service之间无调用阻塞,即时响应。
  • 回弹性:当Service失效、服务器失效,系统能够进行自修复,依然保持响应,不会出现系统崩溃。
  • 弹性:能够对调用负载压力做出响应,能够自动进行资源伸缩适应负载压力,能够根据系统负载能力控制请求的进入速度(回压)。
  • 消息驱动:Service之间通过消息驱动,完成服务流程,Service之间没有任何调用耦合,唯一的耦合就是消息,前一个Service的返回值,必须是后一个Service的输入参数,Flower框架负责将前一个Service的返回值封装成一个消息,发送给后一个Service。

Flower既是一个反应式编程框架,又是一个分布式微服务框架。

Flower框架使得开发者无需关注反应式编程细节,即可得到一个反应式系统。

快速上手

Flower框架的主要元素包括:Flower Service(服务)、Flower 流程和Flow容器。Service实现一个细粒度的服务功能,Service之间通过Message关联,前一个Service的返回值(Message),必须是后一个Service的输入参数(Message),Service按照业务逻辑编辑成一个Flow(流程),Flower容器负责将前一个Service的返回消息,传递给后一个Service。

安装

Maven


tcom.ly.train
tflower.core
tA.B.C

Gradle

compile group: 'com.ly.train', name: 'flower.core', version: 'A.B.C'

SBT

libraryDependencies += "com.ly.train" % "flower.core" % "A.B.C"

Ivy


Flower初始化

Flower使用前需要进行初始化,这里演示最简单的方式。

Flower初始化

 FlowerFactory flowerFactory = new SimpleFlowerFactory();

定义Flower服务

开发Service类必须实现Flower框架的Service接口或者继承AbstractService基类,在process方法内完成服务业务逻辑处理。

UserServiceA

public class UserServiceA implements Service {
static final Logger logger = LoggerFactory.getLogger(UserServiceA.class);
@Override
public User process(User message, ServiceContext context) throws Throwable {
message.setDesc(message.getDesc() + " --> " + getClass().getSimpleName());
message.setAge(message.getAge() + 1);
logger.info("结束处理消息, message : {}", message);
return message;
}
}

UserServiceB

public class UserServiceB implements Service {
static final Logger logger = LoggerFactory.getLogger(UserServiceB.class);
@Override
public User process(User message, ServiceContext context) throws Throwable {
message.setDesc(message.getDesc() + " --> " + getClass().getSimpleName());
message.setAge(message.getAge() + 1);
logger.info("结束处理消息, message : {}", message);
return message;
}
}

UserServiceC1

public class UserServiceC1 implements Service {
static final Logger logger = LoggerFactory.getLogger(UserServiceC1.class);
@Override
public User process(User message, ServiceContext context) throws Throwable {
message.setDesc(message.getDesc() + " --> " + getClass().getSimpleName());
message.setAge(message.getAge() + 1);
logger.info("结束处理消息, message : {}", message);
return message;
}
}

服务注册

Flower提供两种服务注册方式:配置文件方式和编程方式。

  • 编程方式
 ServiceFactory serviceFactory = flowerFactory.getServiceFactory();
serviceFactory.registerService(UserServiceA.class.getSimpleName(), UserServiceA.class);
serviceFactory.registerService(UserServiceB.class.getSimpleName(), UserServiceB.class);
serviceFactory.registerService(UserServiceC1.class.getSimpleName(), UserServiceC1.class);
  • 配置文件方式 服务定义配置文件扩展名: .services,放在classpath下,Flower框架自动加载注册。 flower_test.services
UserServiceA = com.ly.train.flower.base.service.user.UserServiceA
UserServiceB = com.ly.train.flower.base.service.user.UserServiceB
UserServiceC1 = com.ly.train.flower.base.service.user.UserServiceC1

服务流程编排

Flower框架提供两种服务流程编排方式:配置文件方式和编程方式。

两种编排方式的结果是一样:

UserServiceA -> UserServiceB -> UserServiceC1
  • 编程方式编排流程
// UserServiceA -> UserServiceB -> UserServiceC1
final String flowName = "flower_test";
ServiceFlow serviceFlow = serviceFactory.getOrCreateServiceFlow(flowName);
serviceFlow.buildFlow(UserServiceA.class, UserServiceB.class);
serviceFlow.buildFlow(UserServiceB.class, UserServiceC1.class);
serviceFlow.build();
  • 配置文件方式编排流程 流程配置文件扩展名: .flow,放在classpath下,Flower框架自动加载编排流程。 flower_test.flow
UserServiceA -> UserServiceB
UserServiceB -> UserServiceC1

调用Flower流程

前面定义了3个Flower服务,并编排了名称为flower_test的服务流程。那么怎么使用它呢?

  1. 同步调用,需要Flower服务流程响应结果。
  2. 异步调用,不需要Flower服务流程响应结果。
  • 同步调用
final FlowRouter flowRouter = flowerFactory.buildFlowRouter(flowName, 16);
Object result = flowRouter.syncCallService(user);
  • 异步调用
final FlowRouter flowRouter = flowerFactory.buildFlowRouter(flowName, 16);
flowRouter.asyncCallService(user);

完整示例

 FlowerFactory flowerFactory = new SimpleFlowerFactory();
ServiceFactory serviceFactory = flowerFactory.getServiceFactory();
serviceFactory.registerService(UserServiceA.class.getSimpleName(), UserServiceA.class);
serviceFactory.registerService(UserServiceB.class.getSimpleName(), UserServiceB.class);
serviceFactory.registerService(UserServiceC1.class.getSimpleName(), UserServiceC1.class);
final String flowName = "flower_test";
ServiceFlow serviceFlow = serviceFactory.getOrCreateServiceFlow(flowName);
serviceFlow.buildFlow(UserServiceA.class, UserServiceB.class);
serviceFlow.buildFlow(UserServiceB.class, UserServiceC1.class);
serviceFlow.build();
final FlowRouter flowRouter = flowerFactory.buildFlowRouter(flowName, 16);
User user = new User();
user.setName("响应式编程 ");
user.setAge(2);
Object o = flowRouter.syncCallService(user);
System.out.println("响应结果: " + o);

flowRouter.asyncCallService(user);

Flower应用指南

在Flower里面消息是一等公民,基于Flower开发的应用系统是面向消息的应用系统。 消息由Service产生,是Service的返回值;同时消息也是Service的输入。前一个Service的返回消息是下一个Service的输入消息,没有耦合的Service正是通过消息关联起来,组成一个Service流程,并最终构建出一个拥有完整处理能力的应用系统。流程举例:

// -> service1 -> service2 -> service5 -> service4
// ^ | ^ |
// | -> service3 -| |
// |___________________________________|

术语

  • 服务:即Service,Flower里完成业务逻辑的最小单位,服务的粒度可以很大,比如完成整个http请求处理计算可以封装在一个服务里;服务的粒度也可以很小,比如仅仅完成一个邮件格式验证。服务处于流程之中,因此有前后顺序。
  • 前序服务,在流程中处于当前讨论的服务的前一个服务,上面例子中,service2是service5的前序服务。
  • 后继服务,在流程中处于当前讨论的服务的后一个服务,上面例子中,service5是service2的后继服务。
  • 框架内置服务,由Flower框架内置提供的服务,通常用来实现某种消息模式。
  • 消息:消息是服务的输入和输出,消息经过流程中的一个又一个服务的处理,完成系统的功能。
  • 流程:通过流程定义,将多个服务联系起来,构成这个系统的业务处理流程,Flower应用中,流程及服务边界的设计是重要的设计目标。
  • 流程通道:定义好的流程,会被Flower框架加载,构成流程实例通道,简称流程通道,一个流程通道,就是一个消息的处理流通道。一个流程定义可以被加载创建多个流程通道。如果流程入口消息是并发发送过来的,比如典型的web应用入口,那么就应该创建多个流程通道。
  • 流程通道路由器:如果有多个流程通道,在流程入口处,使用流程通道路由器进行消息选择,由路由器将消息发送给不同的流程通道去处理。

Flower消息处理模式

消息除了将服务串联起来,构成一个简单的串行流程,还可以组合应用,产生更强大的功能。

消息分叉

消息分叉是指,一个服务输出的消息,可能产生分叉,分发给1个或者多个其他服务。消息分叉后有两种处理方式,全部分发和条件分发。

全部分发

将输出消息分发给全部流程后续服务。后续多个服务接受到消息后,并行执行。这种模式多用于可并行执行的多个子任务,比如用户注册成功后,需要1、将用户数据写入数据库,2、给用户发送激活邮件,3、给用户发送通知短信,4、将新用户注册信息发送给关联产品,实现账户打通。上述4个服务就可以采用消息全部分发模式,接受用户注册消息,并发完成上述4个任务。

要实现消息全部分发,需要在流程中进行配置,所有需要接受前序服务的输出消息的服务都要配置在流程中,如

service1 -> service2
service1 -> service3

service1是前序服务,service2和service3是后继服务。 如果service2和service3的class定义中,实现Service接口的声明中指定了泛型,则泛型类型必须是service1的输出类型或者其父类。

Service1

public class Service1 implements Service {
@Override
public Object process(Object message) {
return new Message2();
}
}

Service2

// Service2声明类型为Service1的输出类型
public class Service2 implements Service {
@Override
public Object process(Message2 message) {
return message.getAge() + 1;
}
}

Service3

// Service3声明类型为Service1的输出类型
public class Service3 implements Service {
@Override
public Object process(Message2 message) {
return message.getName().toUpperCase();
}
}

条件分发

有时候,前一个服务产生的消息,根据消息内容和业务逻辑可能会交给后续的某一个服务处理,而不是全部服务处理。比如用户贷款申请,当前服务计算出用户信用等级后,需要根据信用等级判断采用何种贷款方式,或者是拒绝贷款,不同贷款方式和拒绝贷款是不同的服务,这些服务在流程配置的时候,都需要配置为前序服务的后继服务,但是在运行期根据条件决定将消息分发给具体哪个后继服务。

实现条件分发在流程配置上和全部分发一样,所有可能的后继服务都要配置在流程中。具体实现条件分发有如下三种方式。

根据泛型进行分发

后续服务实现接口的时候声明不同的泛型类型,前序服务根据业务逻辑构建不同的消息类型,Flower会根据消息类型匹配对应的服务,只有成功匹配,消息才发送给过去。比如:

构建流程

//构建流程,ServiceB和ServiceC为ServiceA的后续流程
ServiceFlow.buildFlow("sample", "serviceA", "serviceB");
ServiceFlow.buildFlow("sample", "serviceA", "serviceC");

声明ServiceB接受的消息类型为MessageB

public class ServiceB implements Service {
@Override
public Object process(MessageB message) {
System.out.println("I am Service B.");
return null;
}
}

声明ServiceC接受的消息类型为MessageC

public class ServiceC implements Service {
@Override
public Object process(MessageC message) {
System.out.println("I am Service C.");
MessageX mx = new MessageX();
mx.setCondition("serviceE");
return mx;
}
}

ServiceA

public class ServiceA implements Service {
@Override
public Object process(String message) {
if ("b".equals(message)) {
return new MessageB();
}
if ("c".equals(message)) {
return new MessageC();
}
return null;
}
}

ServiceB是ServiceA的后续服务,ServiceA收到的消息如果是字符串“b”,就会返回消息类型B,这时候框架就会将消息发送给ServiceB,而不会发送给ServiceC。

在消息中指定后继服务的id进行分发

前序消息实现Condition接口,并指定后继服务的id,如:

//serviceE和serviceD是serviceC的后继服务
ServiceFlow.buildFlow("sample", "serviceC", "serviceD");
ServiceFlow.buildFlow("sample", "serviceC", "serviceE");
//消息实现Condition接口
public class MessageX implements Condition {
private Object condition;
public void setCondition(Object src) {
this.condition = src;
}
@Override
public Object getCondition() {
return condition;
}
}
//在serviceC的返回消息中设定要分发的后续服务id
public class ServiceC implements Service {
@Override
public Object process(MessageC message) {
System.out.println("I am Service C.");
MessageX mx = new MessageX();
//设定将消息分发给后续服务serviceE
mx.setCondition("serviceE");
return mx;
}
}

一般说来,服务是可复用的,可复用于不同的流程中,但是在不同的流程中后继服务可能是不同的,后继服务的id也是不同的,在服务中写死后续服务id,显然不利于服务的复用。解决方案有两种,一种是在不同的流程中,写一个专门用于分发的服务,也就是处理业务逻辑的服务并不关心消息的分发,只管返回消息内容,但是其后继服务是一个专门用来做消息分发的服务,这个服务没有业务逻辑,仅仅实现Condition接口根据消息内容指定后继服务。

另一种是使用框架内置服务ConditionService进行消息分发

使用框架内置服务ConditionService进行消息分发

ConditionService是一个通用的消息分发服务,

 ServiceFlow.buildFlow("sample", "serviceE", "serviceCondition");
ServiceFlow.buildFlow("sample", "serviceCondition", "serviceF");
ServiceFlow.buildFlow("sample", "serviceCondition", "serviceG");

服务serviceE要将消息根据条件分发给serviceF或者serviceG,流程配置如上,中间加入serviceCondition进行适配。 serviceCondition的服务注册方法为

 ServiceFactory.registerService("serviceCondition",
"com.ly.train.flower.common.service.ConditionService;serviceF,serviceG");

com.ly.train.flower.common.service.ConditionService为框架内置服务

public class ServiceE implements Service {
@Override
public Object process(Object message) {
System.out.println("I am Service E.");
MessageX x = new MessageX();
x.setCondition(1);
return x;
}
}

这种方式中,依然需要在serviceCondition的前驱服务serviceE中设置返回消息的condition,但是不必设置后续服务的id,只需要设置后续服务的顺序号即可。

几种条件分发的代码示例参考/flower.sample/src/main/java/com/ly/train/flower/common/sample/condition/Sample.java

消息聚合

对于全部分发的消息分叉而言,通常目的在于使多个服务能够并行执行,加快处理速度。通常还需要得到这些并行处理的服务的全部结果,进行后续处理。 在Flower中,得到多个并行处理服务的结果消息,称为消息聚合。实现方式为,在流程中,配置需要聚合的多个消息的后续服务为com.ly.train.flower.common.service.AggregateService,这是一个框架内置服务,负责聚合多个并行服务产生的消息,将其封装到一个Set对象中返回。 如流程

service2 -> service5
service3 -> service5
service5 -> service4

这里的service5就是一个消息聚合服务,负责聚合并行的service2和service3产生的消息,并把聚合后的Set消息发送给service4. 服务配置如下,service5配置为框架内置服务AggregateService。

service2 = com.ly.train.flower.sample.textflow.Service2
service3 = com.ly.train.flower.sample.textflow.Service3
service4 = com.ly.train.flower.sample.textflow.Service4
service5 = com.ly.train.flower.common.service.AggregateService

service4负责接收处理聚合后的消息,从Set中取出各个消息,分别处理。

public class Service4 implements Service {
@Override
public Object process(Set message) {
Message2 m = new Message2();
for (Object o : message) {
if (o instanceof Integer) {
m.setAge((Integer) o);
}
if (o instanceof String) {
m.setName(String.valueOf(o));
}
}
Message3 m3 = new Message3();
m3.setM2(m);
return m3;
}

消息回复

Flower中的消息全部都是异步处理,也就是服务之间不会互相阻塞等待,以实现低耦合、无阻塞、高并发的响应式系统。Flower流程调用者发送出请求消息以后,消息在流程中处理,调用者无需阻塞等待处理结果,可以继续去执行其他的计算任务。

和传统的命令式编程不同,通常流程的发起调用者并不是流程处理结果的最终接受者,比如对于web开发,流程的发起者通常是一个servlet,但是真正接受处理结果的是用户端浏览器或者App,流程中的服务可以直接发送处理结果给用户端,而不必通过servlet。也就是调用发起者servlet无需等待流程服务的最终处理结果,将用户请求发送到流程中后,不必阻塞等待处理,可以立即获取另一个用户的请求继续进行处理。

但是Flower也支持调用者阻塞等待消息处理结果,消息回复模式可以使流程调用者得到流程处理的最终结果消息。可参考代码示例 /flower.sample/src/main/java/com/ly/train/flower/common/sample/textflow/Sample.java

Flower web开发模式

Flower集成Servlet3的web开发模式

Flower支持Servlet3的异步模式,请求处理线程在调用Flower流程,并传入AsyncContext对象后立即释放。 代码示例参考/flower.sample/src/main/java/com/ly/train/flower/common/sample/web/async/AsyncServlet.java

开发支持Servlet3的Flower服务,需要实现框架的Service接口,在方法 Object process(T message, ServiceContext context) throws Exception;中,Flower框架会传入一个Web对象,通过context.getWeb()得到Web对象,用以获得请求参数和输出处理响应结果。

Flower集成Spring boot的web开发模式

Flower支持Spring boot开发,在项目中依赖flower.web,实现框架中的Service接口和InitController接口。 初始化@BindController注解需要的参数,在编译过程中自动由flower.web枚举@BindController注解, 生成Spring boot需要的Controller。

注意: flower.web利用annotation为Service生成spring boot所需的Controller类。这个生成过程在程序编译的时候完成,如果IDE环境不支持热编译,需要在命令行执行mvn install生成代码。

代码示例参考/flower.sample/src/main/java/com/ly/train/flower/common/sample/springboot

  • @BindController path参数: http请求的url路径
  • @BindController method参数: http发起的请求方式(GET, POST)
  • @BindController paramClass参数: http发送的参数反序列化的对象类型
  • @BindController 不指定paramClass,GET和POST方式都需要在process中使用context.getWeb()获取AsyncContext, 自行获取参数
  • @BindController method=GET paramClass 指定了类,需要实现Service接口,process的第一个参数返回paramClass对象
  • @BindController method=POST paramClass 指定了类, 没有继承PostJson接口,需要实现Service接口,process的第一个参数返回paramClass对象 http请求的参数:name=aaa&id=2222, header需要Content-Type: application/x-www-form-urlencoded
  • @BindController method=POST paramClass 指定了类, 继承PostJson接口,需要实现Service接口,process的第一个参数返回paramClass对象 http请求的参数:{"name":"aaa","id":111}, header需要Content-Type: Content-Type: application/json;charset=UTF-8

使用Flower框架的开发建议

  • 进行流程设计。服务边界,服务流程,消息类型和数据,在系统设计阶段充分考虑,流程设计好了,系统架构也就设计好了。
  • 进行消息设计。在Flower里,消息也是服务之间的的接口,设计好消息,对服务的输入和输出约束就有了,团队开发时,就可以基于消息接口,各自开发自己的Service,只要严格遵循消息规范,不同开发者在开发的时候不需要彼此依赖,可以提高并行开发速度。而且只要各自做好自己的单元测试,集成测试的时候问题和工作量会少很多。
  • 尽量提高可并行的服务数量,特别是不同资源消耗性的服务的并行;比如CPU密集型的、内存消耗型的、IO密集型的任务并行执行,可以极大提供系统的资源利用率。
  • 如无非常必要,请勿阻塞等待,阻塞等待将会挂起Flower底层的akka线程,也会占用Flower的流程通道。

Flower分布式开发

Flower分布式部署架构

  • Flower.center: Flower实现的注册中心,用于注册服务信息、流程信息
  • Flower容器: 业务开发的代码部署到Flower服务容器中启动
  • Flower网关: 服务流程编排,流程入口程序

开发流程

一. 启动Flower.center注册中心

二. 开发Flower Service,启动业务服务Flower容器,自动向注册中心注册服务

三. 开发Flower web网关,启动Flower网关服务,编排流程

一. 注册中心

Flower.center基于spring-boot开发,通过打包成fat-jar后通过命令行启动即可。

Flower注册中心启动入口/flower.center/src/main/java/com/ly/train/flower/center/CenterApplication.java Flower注册中心启动命令java -jar flower.center-0.1.2.jar

二. 启动业务Flower容器

Flower部署支持Flower容器和Spring容器,下面的例子基于spring-boot演示

2.1 创建配置文件flower.yml

name: "LocalFlower"
basePackage: com.ly.train.order.service
host: 127.0.0.1
port: 25004
registry:
- url: "flower://127.0.0.1:8096?application=LocalFlower"

2.2 配置FlowerFactory

@Configuration
public class FlowerConfiguration {
@Bean
public FlowerFactory flowerFactory() {
FlowerFactory flowerFactory = new SpringFlowerFactory();
flowerFactory.start();
return flowerFactory;
}
}

2.3 开发flower服务

@FlowerService
public class CreateOrderService implements Service {
private static final Logger logger = LoggerFactory.getLogger(CreateOrderService.class);
@Override
public Boolean process(Order message, ServiceContext context) throws Throwable {
try {
logger.info("创建订单 : {}", message);
} catch (Exception e) {
logger.error("", e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}

2.4 创建启动类

@SpringBootApplication
@ComponentScan("com.ly.train.order")
@FlowerComponentScan("com.ly.train.order")
@EnableAutoConfiguration
@MapperScan(basePackages = "com.ly.train.order.dao")
public class OrderPlatformApplication {
public static void main(String[] args) {
SpringApplication.run(OrderPlatformApplication.class, args);
}
}

运行启动类即可实现Flower服务自动注册,并对外提供服务

三. 启动网关服务器,编排流程

Flower网关服务器基于spring-mvc展示,跟flower服务一样,需要提供flower.yml配置信息,并配置FlowerFactory

3.1 创建flower.yml

name: "LocalFlower"
basePackage: com.ly.train.web
host: 127.0.0.1
port: 25006
registry:
- url: "flower://127.0.0.1:8096?application=LocalFlower"

3.2 配置FlowerFactory

@Configuration
public class FlowerConfiguration {
@Bean
public FlowerFactory flowerFactory() {
FlowerFactory flowerFactory = new SpringFlowerFactory("flower.yml");// 为了显示更多使用方式,这里指定配置文件路径
flowerFactory.start();
return flowerFactory;
}
}

3.3 开发Flower服务

@FlowerService(type = FlowerType.AGGREGATE)// 聚合服务类型
public class EndService extends AbstractService, Object> implements Flush, HttpComplete, Complete {
private Logger logger = LoggerFactory.getLogger(EndService.class);
@Override
public Object doProcess(List message, ServiceContext context) throws Throwable {
Response> res = R.ok(message);
String ret = JSONObject.toJSONString(res, true);
context.getWeb().print(ret);
logger.info("聚合服务收到消息:" + message);
return message;
}
@Override
public void onError(Throwable throwable, List param) {
super.onError(throwable, param);
}
}

3.4 开发网关Controller

@RestController
@RequestMapping("/order/")
@Flower(value = "createOrderFlow", flowNumber = 8)
public class CreateOrderController extends FlowerController {
@Autowired
OrderNoService orderNoService;
@RequestMapping(value = "createOrder")
public void createOrder(OrderExt orderExt, HttpServletRequest req) throws IOException {
orderExt.setOrderNo(orderNoService.generateOrderNo());
orderExt.setCreateTime(new Date());
logger.info("收到请求:{}", orderExt);
doProcess(orderExt, req);
}
@Override
public void buildFlower() {
getServiceFlow().buildFlow(StartService.class.getSimpleName(), Arrays.asList("CreateOrderService", "CreateOrderExtService"));
getServiceFlow().buildFlow(Arrays.asList("CreateOrderService", "CreateOrderExtService"), "EndService");
getServiceFlow().build();//流程创建完成会自动注册流程信息到注册中心
}
}

集成Flower提供的基类FlowerController,使用方可以使用SpringMVC提供的注解,最大程度上保留SpringMVC的功能,学习成本几乎为零,里面封装了一些细节,让使用更关注业务开发。如果熟悉Flower的使用方式,使用方也可以完全自行扩展。

3.5 启动类

@SpringBootApplication
@ComponentScan("com.ly.train.web")
@FlowerComponentScan("com.ly.train.web.service")
@EnableAutoConfiguration
public class WebApplication {
public static void main(String[] args) {
SpringApplication.run(WebApplication.class, args);
}
}

实例项目细节

flower分布式实例 https://github.com/leeyazhou/flower.showcase.git

More

核心概念

  • FlowerFactory Flower框架的入口程序,同一个JVM进程中可以创建多个FlowerFactory,互相不影响,实现应用隔离。
  • ServiceFactory 管理Flower框架中的服务,包括流程管理和服务管理
  • FlowRouter 流程的载体,一个FlowerRouter包含一个流程相关信息
  • ServiceRouter 服务的载体,一个ServiceRouter包含一个服务的相关信息

FlowerFactory

  • 方法一

使用默认的FlowerFactory

FlowerFactory flowerFactory = SimpleFlowerFactory.get();
flowerFactory.start();
flowerFactory.stop();
  • 方法二

按需创建自己的FlowerFactory,配置文件路径默认读取classpath:flower.yml,配置文件内容格式为yaml风格,详情查看配置信息。

FlowerFactory factory = new SimpleFlowerFactory("conf/flower_25003.yml");
factory.start();
factory.stop();

获取FlowerFactory之后,就可以使用它提供的接口:

 /**
* 获取Flower容器配置信息
*
* @return {@link FlowerConfig}
*/
FlowerConfig getFlowerConfig();
/**
* 获取注册中心
*
* @return {@link Registry}
*/
Set getRegistry();
/**
* 异常处理器
*
* @return {@link ExceptionHandler}
*/
ExceptionHandler getExceptionHandler();
/**
* akka Actor 工厂
*
* @return {@link ServiceActorFactory}
*/
ServiceActorFactory getServiceActorFactory();
/**
* {@link Service}工厂
*
* @return {@link ServiceFactory}
*/
ServiceFactory getServiceFactory();
ServiceFacade getServiceFacade();

FlowRouter流程路由器,创建流程之后,通过FlowerFactory可以创建出对应的路由器,之后便可以进行服务的调用了。

FlowRouter flowRouter = factory.getServiceFacade().buildFlowRouter("flowerSample", 2 << 6);
flowRouter.syncCallService(message);
flowRouter.asyncCallService(message, ctx);

分布式

Flower.yml配置信息

name: "LocalFlower"
host: "127.0.0.1"
port: 25003
# 注册中心地址
registry:
- url:
- "flower://127.0.0.1:8096"
- url:
- "flower://127.0.0.1:8096"
basePackage: com.ly.train.flower
  • name 服务名称
  • host 服务的地址
  • port 服务对外暴露的端口,也是当前Flower监听的段端口
  • registry 注册中心,对于有多个注册中心的服务,需要配置多个地址
  • basePackage 服务扫描的路径,扫描到对应的FlowerService之后会自动注册
  1. Flower容器启动后,会把本地的服务元数据注册到注册中心,便可以为其他应用提供服务;流程编排的过程中,会优先使用本地容器中包含的容器,如果在本地找不到对应的服务信息,会从注册中心拉取服务信息,并创建RemoteActor备用。
  2. 流程创建完毕,会把流程配置信息上传到注册中心。如果流程中涉及到本地Service和远程Service进行混排时,那么远程Service执行完毕后,可能需要把消息回传到本地Service中,这时需要从注册中心拉取Flow的配置信息,然后才能获取到当前服务的下一个Service的地址。

Flower框架设计

了解关于Flower的内部设计,有助于你更好地利用Flower开发一个反应式系统。

Flower core模块(进程内流式微服务框架)设计

Flower基于Akka的Actor进行开发,将Service封装到Actor里面,Actor收到的消息作为参数传入Service进行调用,Service的输出发送给后续Actor作为Service的输入。

Flower核心类

  • 用户开发的Service实现Service或者HttpService接口
  • ServiceFactory负责用户以及框架内置的service实例管理(加载*.services文件)
  • ServiceFlow负责流程管理(加载*.flow文件)
  • ServiceActor将Service封装到Actor

Flower初始化及调用时序

服务流程初始化

  • 开发者通过ServiceFacade调用已经定义好的服务流程
  • ServiceFacade根据传入的flow名和service名,创建第一个ServiceActor
  • ServiceActor通过ServiceFactory装载Service实例,并通过ServiceFlow获得配置在流程中当前Service的后续Service(可能有多个)
  • 递归创建后续Service的ServiceActor,并记录后续ServiceActor的ActorRef

消息流处理

  • 调用者发送给ServiceFacade的消息,将被flow流程中的第一个ServiceActor处理
  • ServiceActor调用对应的Service实例
  • ServiceActor将Service实例的返回值作为消息发送给流程定义的后续ServiceActor

Flower的核心设计不过如此。但是由此延伸出来的应用方法和设计模式却和Akka有了极大的不同。

分布式流式微服务框架设计

传统的分布式微服务框架通过远程调用的方式实现服务的解耦与分布式部署,使得系统开发、维护、服务复用、集群部署更加方便灵活,但是这种微服务依然许多不足之处

  • 具有较高的耦合性,服务之间需要在代码层面依赖调用,如果想要增加新的依赖关系,必须修改代码,修改代码是一切混乱的起源
  • 服务之间同步阻塞调用,在被依赖的服务调用返回之前,当前服务必须阻塞等待,如果调用了几个服务,后面的服务必须串行等待前面的服务完成才能开始调用
  • 服务的粒度不好控制,微服务如何设计没有统一的指导思想,不同系统的微服务设计千差万别,不成熟的团队因为使用微服务架构而更加混乱

流式微服务框架Flower致力于构建一种新的微服务架构体系,使用流式计算的架构思想,以一种更加轻量、更易于设计开发、消息驱动、弱依赖,异步并发的技术特点开发实现微服务系统

  • 服务之间消息驱动,不需要直接依赖,没有代码耦合
  • 服务之间异步调用,前面的服务完成后,发送消息后不用管,后面的服务异步处理消息
  • 服务的粒度天然控制在消息的层面,每个服务只处理一个消息,而消息对于通常的web开发是天然的,一个请求就是一个消息,一个订单就是一个消息,一个用户也是一个消息,而消息就是模型,所以只要做好领域模型设计,无需用模型再去驱动设计,只需要让模型,也就是消息流动起来就可以了,模型流动到不同的服务,被不断计算、填充完善,最后完成处理就可以了,是真正的面向模型设计。

架构

部署模型

Flower将整个应用系统集群统一管理控制,控制中心控制管理集群的所有资源

Agent部署在集群每一台服务器上,负责加载服务实例,并向控制中心汇报状态

代码仓库负责管理服务的java程序包,程序包用assembly打包

控制中心和Agent基于Akka开发,每个服务包装一个actor里面,actor之间负责消息的通信

集群启动与服务部署时序模型

注册服务数据结构

  1. 服务名:字符串,全局唯一
  2. 服务路径名:class全路径名,全局唯一
  3. 服务jar包名:服务所在的jar文件名,全局唯一
  4. 所有者、使用者:权限控制
  5. agent列表:服务需要在哪些agent启动的列表,可动态添加
  6. 服务编排与消息通信

服务之间的依赖关系在控制中心编排

  1. 服务编排时,只需要编排每个服务的后续服务,1:0..n
  2. 从整个系统看,所有服务构成一个有向无环图DAG
  3. 服务自身不负责消息通信,消息通信由akka的actor完成
  4. 每个服务只处理一种消息
  5. //TODO 服务接口定义 public object process(object message)

本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/63688.html