上篇文章,简单介绍了HSF框架的初始化。这一篇,小编将为大家带来HSF provider的启动和服务细节。主要关注点:Server的启动,服务的注册,调用处理主流程(IO线程,业务线程)。
一. Server的启动
在某一个HSFSpringProviderBean初始化时,启动HSF Server。在HSF默认协议使用TBRemoting作为RPC框架,TBRemotingRPCProtocolComponent.registerProvider中:
providerServer.startHSFServer();
其代码如下:
Server server = new DefaultServer(configService.getHSFServerPort());
server.start();
……
DefaultServer构造:
acceptor = new SocketAcceptor(processorCount, IO_THREAD_POOL);
SocketAcceptorConfig cfg =(SocketAcceptorConfig)acceptor.getDefaultConfig();
cfg.setBacklog(BACKLOG_SIZE);
cfg.setThreadModel(ThreadModel.MANUAL);
cfg.getFilterChain().addLast(CODEC_FILTER_NAME, CODEC_FILTER);
1. 可以看到是使用了mina作为NIO框架,启动一个server的SocketAcceptor,并配置
2. 关键的一点是在filter的最后加了一个CODEC_FILTER,做为序列化的filter。后续会单独分析。
Server的start:
IoHandlerioHandler = newDefaultIoHandler(this);
try {
acceptor.bind(serverSocket,ioHandler);
started= true;
LOGGER.warn("服务器已启动:" + serverSocket);
}
1. 构造一个默认handler,作为接受RPC请求的处理入口
2. 绑定端口
已上就完成了HSF的Server启动。此时端口已经打开,可以接受请求了。
二. 服务的注册
在启动server之后,HSF会将这个provider信息注册到configserver上。入口是: metadataService.publish(metadata);
具体实现:
privatePublisher<String> doPublish(ServiceMetadata metadata) {
final String serviceUniqueName =metadata.getUniqueName();
final String data =HSFServiceTargetUtil.getTarget(configService.getHSFServerPort(), metadata);
final String publisherId =PUBLISHER_PREFIX + serviceUniqueName;
PublisherRegistration<String>registration = new PublisherRegistration<String>(publisherId,serviceUniqueName);
registration.setGroup(metadata.getGroup());
Publisher<String> publisher =PublisherRegistrar.register(registration);
publisher.publish(data);
return publisher;
}
1. 拼成TB Remoting格式的Target地址信息: 10.7.42.161:12200?CLIENTRETRYCONNECTIONTIMES=3&CLIENTRETRYCONNECTIONTIMEOUT=1000&_SERIALIZETYPE=hessian&_IDLETIMEOUT=10&_TIMEOUT=5000
2. 构造PublisherRegistration数据实体,代表一次发布,设置provider的分组,给分组调用用。
3. 注册一个新的发布者身份以发布数据
a. 默认使用IsolatedPublisherRegistrar进行注册
b. 构造默认发布者DefaultPublisher
DefaultPublisher(PublisherRegistration<T>registration, ConfigClientWorker worker) {
super(registration, worker);
......
this.regRequest = newPublisherRegReqPacket(registration.getDataId(), registration.getClientId(),
datumId != null ? datumId :UUID.randomUUID().toString()); //UUID as default datum ID.
.......
}
在父类DefaultDataClient中初始化ConfigClientWorker:
this.worker= (worker != null) ? worker : ConfigClientWorker.getDefaultWorker();
ConfigClientWorker是config server客户端的核心类,负责管理客户端task的异步执行,当然发布provider也是一个task。初始化ConfigClientWorker时,会初始化其内部的WorkThread线程和连接管理ConfigClientConnection。
可见ConfigClientWorker管理着业务线程和IO线程之间的交互,非常重要。此时,work线程还没启动,和Config server的连接也没有连接,只是初始化而已。
4. 发布数据
a. 先看看data是否可序列化,使用java序列化data
newObjectOutputStream(new OutputStream() {
@Override public void write(intb) throws IOException {}
}).writeObject(datum);
b. 通过之前创建的ConfigClientWorker提交一个任务 worker.schedule(this);
c. 将task放入阻塞队列 tasks.put(task);
d. 唤醒work线程,workerThread.signal();
e. 如果是第一次,则启动work线程
synchronized(bell){
if (isAlive()) {
bell.notifyAll();
} else {
try {
start(); // Lazy start
}catch(IllegalThreadStateException e) {
log.fatal("[Internal] Worker thread is dead.");
}
}
}
可以发现发布数据是一个异步操作,中间通过ConfigClientWorker隔开了,而WorkThread是处理task的线程,task包括发送和接受,对应ConfigClientWorker的2个属性:
// Tokeep messages from configServer. Access is shared between deliverer thread andcommunication thread.
finalQueue<ProtocolPackage> mailbox = newLinkedBlockingQueue<ProtocolPackage>(MAX_MAILBOX_SIZE);
// Access isshared between user thread and deliverer thread.
private final TaskQueue tasks =new TaskQueue();
f. Work线程执行发布任务,创建和ConfigServer的连接
privatevoid ensureConnected() throws InterruptedException {
if (connection.isConnected())return;
// Block until connected to server.
while(! connection.connect()) {
Thread.sleep(GLOBAL_RECONNECTING_DELAY);
}
}
之前创建的Connection在这里同步初始化连接,使用TB remoting框架,这里最终是通过一个共享的资源器拿到连接,有兴趣的同学可以单独研究哈。最终的mina client代码:
ConnectFutureminaConnectFuture = connector.connect(
ConnectionUrl.socketAddr(targetUrl),
localAddress,
new ClientIoHandler(this,clientKey, copyListeners),
cfg);
g. 连接创建好之后,开始发送数据 resp= sendReceive(packagee);此处最终会调用mina的session.write(connectionMsg, wfl); 发送数据是异步+wait的过程,发送完成之后会起一个定时timer到超时点时运行,返回客户端超时异常。
TimeoutHandle timeoutHandle = new TimeoutHandle();
timeoutFuture =DefaultClientManager.timer.schedule(timeoutHandle,connRequest.getRespTimeout(), TimeUnit.MILLISECONDS);
三. 服务调用
Provider注册完之后,consumer就可以通过configserver拿到地址了,发起调用了。
在server端通过mina框架的handler接受请求并处理。在mina的filter端有一个codecfilter类RemotingProtocolCodecFilter。对应的decoder和encoder为RemotingProtocolEncoder和RemotingProtocolDecoder。
在Decoder关键点:
a. 使用session保存半包的请求,二进制协议的关键
b. 按协议对ByteBuffer进行解析,输出到ProtocolDecoderOutput
/*
* 新版报文组成:
* Header(1B): 报文版本
* Header(1B): 请求/响应
* Header(1B): 报文协议(HESSIAN/JAVA)
* Header(1B): 单向/双向
* Header(1B): Reserved
* Header(4B): 通信层对象长度
* Header(1B): 应用层对象类名长度
* Header(4B): 应用层对象长度
* Body: 通信层对象
* Body: 应用层对象类名
* Body: 应用层对象
*/
Codec之后会调用DefaultIoHandler.messageReceived进行处理:
DefaultConnection conn =DefaultConnection.findBySession(session);
conn.getMsgReceivedListener().messageReceived(conn, message);
DefaultMsgListener端会执行doRequest方法:
a. 构造响应的ConnectionResponse对象
b. 拿之前注册的processor即ProviderProcessor
c. 拿之前注册的处理请求的线程池
d. 启动一个runnable,扔到线程池中执行
e. Mina io线程返回
业务线程端:
a. 调用ProviderProcessor. handleRequest进行处理
b. 切换classloader到app的classloader,不然会找不到app的类
Thread.currentThread().setContextClassLoader(servicePOJO.getClass().getClassLoader());
c. 反射调用服务方法
Object appResp =workerMethod.invoke(servicePOJO, methodArgs);
d. 返回执行结果
四. 小结
本文简单小结了HSF的服务启动和调用,典型的RPC应用,中间的一些知识点,序列化,nio,异步调用等都是java的基础知识,希望通过本文让大家增加对rpc框架的认识~~
HSF由于使用了OSGI容器,导致对app容器的依赖,扩展性上确实做的不够,tb remoting基本和mina绑死了,这也是淘宝应用的一个特点,能跑能解决业务问题就是王道~~
分享到:
相关推荐
Springboot+HSF分布式服务框架+EDAS注册中心,实现发布订阅模式
EDAS中HSF方式启动服务入门文档,详细描述了EDAS中HSF方式的启动过程
HSF服务框架共28页.pdf.zip
分布式服务框架原理与实践(Dubbo,HSF)_李林锋著
HSF 框架的原理讲解,主要包含了知识点:rpc,动态代理,HSF可以作为微服务的基础框架进行二次开发
hsf框架所需工具包 taobao-hsf.sar 直接放入tomcat根目录tomcate/deploy/下即可
淘宝的HSF框架,用户手册,有兴趣的人欢迎下载~内部文档哦~
内部分享的hsf使用介绍文档(无涉密信息),有兴趣的同学可以看看
高速服务框架 HSF (High-speed Service Framework),HSF 作为一个纯客户端架构的 RPC 框架!HSF应用开发从安装、代码编写到部署详细教程!
taobao-hsf安装包
HSF 官网Demo、IDEA 与 eclipse 开发环境说明;HSF,包结构等等
SF全称为High-Speed Service Framework,旨在为淘宝应用提供一个分布式的服务框架,HSF从分布式应用层面以及统一的发布/调用方式层面为大家提供支持,从而可以很容易的开发分布式的应用以及提供戒使用公用功能模块,...
自己公司用的HSF架构培训手册,当时还有阿里的人来培训我们,希望帮助到需要用的HSF技术的人
HSF物料风险等级评估及抽样检验规范.docx
笔者工作的这几年之中,总结并开发了如下几个框架: summercool(Web 框架,已经应用于某国内大型网络公司的等重要应用)、summercool-hsf(基于Netty实现的RPC框架,已经应用国内某移动互联网公司)、 summercool-...
HFS网络文件服务器V2.3Beta完美汉化版很好的一个文件传输软件,和FTP一样的稳定性,但用的确实HTTP协议,文件传输好,汉化版简单易用!... HFS网络文件服务器V2.3Beta完美汉化版很好的一个文件传输软件,和FTP一样的...
NULL 博文链接:https://arlenye.iteye.com/blog/2294835
DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,000,000,000+次访问量支持,并被广泛应用于阿里巴巴集团的各成员...
field 搬运(图片格式) 注意仅仅是 其中的一部分 hsf_0 hsf_1 (原文件中有hsf_0 --hsf_7) 每个hsf中有数字 字母 小写字母, 大写字母 ,每个数字和字母 都大约有400个样本,我自己是足够用啦 因为整个的包550M太...