`
iwinit
  • 浏览: 452145 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

深入浅出Zookeeper之二Session建立

阅读更多

上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。

官网Datamonitor的代码:

Executor

 

public class Executor implements Watcher, Runnable,
		DataMonitor.DataMonitorListener {
	String znode;

	DataMonitor dm;

	ZooKeeper zk;

	String filename;

	String exec[];

	Process child;

        //Executor是一个watcher,不过其处理都代理给DataMonitor了
	public Executor(String hostPort, String znode, String filename,
			String exec[]) throws KeeperException, IOException {
		this.filename = filename;
		this.exec = exec;
                //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
		zk = new ZooKeeper(hostPort, 3000, this);
                //datamonitor是真实的处理类
		dm = new DataMonitor(zk, znode, null, this);
	}

 

 DataMonitor

 

public class DataMonitor implements Watcher, StatCallback {

.......

	public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
			DataMonitorListener listener) {
......
		// Get things started by checking if the node exists. We are going
		// to be completely event driven,异步exist,注册watcher,设置回调
		zk.exists(znode, true, this, null);
	}

......
	//处理watcher通知事件
	public void process(WatchedEvent event) {
		String path = event.getPath();
		//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
		} else {
			if (path != null && path.equals(znode)) {
				// Something has changed on the node, let's find out
				zk.exists(znode, true, this, null);
			}
		}
		if (chainedWatcher != null) {
			chainedWatcher.process(event);
		}
	}
	//处理exist操作的回掉结果
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		boolean exists;
		switch (rc) {
		case Code.Ok:
			exists = true;
			break;
		case Code.NoNode:
			exists = false;
			break;
		case Code.SessionExpired:
		case Code.NoAuth:
			dead = true;
			listener.closing(rc);
			return;
		default:
			// Retry errors
			zk.exists(znode, true, this, null);
			return;
		}
		//如果节点存在,则同步获取节点数据
		byte b[] = null;
		if (exists) {
			try {
				b = zk.getData(znode, false, null);
			} catch (KeeperException e) {
				// We don't need to worry about recovering now. The watch
				// callbacks will kick off any exception handling
				e.printStackTrace();
			} catch (InterruptedException e) {
				return;
			}
		}
		//如果数据有变化,则处理之
		if ((b == null && b != prevData)
				|| (b != null && !Arrays.equals(prevData, b))) {
			listener.exists(b);
			prevData = b;
		}
	}
}

  从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。

Zookeeper构造

 

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
	//设置默认watcher
        watchManager.defaultWatcher = watcher;
	
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
	//从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
	//创建客户端连接,初始化SendThread和EventThread
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
	//启动SendThread和EventThread
        cnxn.start();
    }

 初始化连接

 

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
	//客户端sessionId
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
	//客户端设置的超时时间
        this.sessionTimeout = sessionTimeout;
	//主机列表
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
	//连接超时
        connectTimeout = sessionTimeout / hostProvider.size();
	//读超时
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
	//初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

 SendThread核心流程

 

public void run() {
            .....
            while (state.isAlive()) {
                try {
			//如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server
                    if (!clientCnxnSocket.isConnected()) {
                        ......
			//异步连接
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
			//如果状态为连接上,则真的是连上server了
                    if (state.isConnected()) {
                        ......
			//下一次select超时时间
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
			//如果没连上,则递减连接超时
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    //session超时,包括连接超时
                    if (to <= 0) {
                        throw new SessionTimeoutException(
                                "Client session timed out, have not heard from server in "
                                        + clientCnxnSocket.getIdleRecv() + "ms"
                                        + " for sessionid 0x"
                                        + Long.toHexString(sessionId));
                    }
		    //如果send空闲,则发送心跳包
                    if (state.isConnected()) {
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If we are in read-only mode, seek for read/write server
		    //如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = System.currentTimeMillis();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
				//同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
			//处理IO
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " : " + e.getMessage());
                        }
                        break;
                    } else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else {
				......
                        }
			//清理之前的连接,找下一台server连接
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                }
            }
     ......
        }

 具体过程

 

private void startConnect() throws IOException {
		//状态改为CONNETING
            state = States.CONNECTING;
		//拿目标地址
            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
		......
		//异步连接
            clientCnxnSocket.connect(addr);
        }

 具体connect

 

    void connect(InetSocketAddress addr) throws IOException {
	//创建客户端SocketChannel
        SocketChannel sock = createSock();
        try {
		//注册OP_CONNECT事件,尝试连接
           registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
	//session还未初始化
        initialized = false;

        /*
         * Reset incomingBuffer
         */
	//重置2个读buffer,准备下一次读
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

 registerAndConnect过程:

 

    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
	//尝试连接
        boolean immediateConnect = sock.connect(addr);
	//如果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

 primeConnection代表连上之后的操作,主要是建立session:

 

void primeConnection() throws IOException {
            ......
		//客户端sessionId默认为0
            long sessId = (seenRwServerBefore) ? sessionId : 0;
		//构造连接请求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
                ......
		//组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));
            }
		//确保读写事件都监听
            clientCnxnSocket.enableReadWriteOnly();
            .....
        }

 此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:

 

 void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
	//select
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
		//如果之前连接没有立马连上,则在这里处理OP_CONNECT事件
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } 
	//如果读写就位,则处理之
	else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }

 假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest

 

if (sockKey.isWritable()) {
	    //同步处理
            synchronized(outgoingQueue) {
		//从发送队列中拿请求
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
			//修改上次发送时间
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
		//序列化Packet到ByteBuffer
                    if (p.bb == null) {
			//如果是业务请求,则需要设置事务Id
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
			//序列化
                        p.createBB();
                    }
			//写数据
                    sock.write(p.bb);
			//写完了,太好了,发送成功
                    if (!p.bb.hasRemaining()) {
			//已发送的业务Packet数量
                        sentCount++;
			//发送完了,那从发送队列删掉,方便后续发送请求处理
                        outgoingQueue.removeFirstOccurrence(p);
			//如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
		//请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }

 具体序列化方式,ConnRequest的packet没有协议头

 

 public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
		//写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节
                boa.writeInt(-1, "len"); // We'll fill this in later
		//序列化协议头
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
		//序列化协议体
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
		//生成ByteBuffer
                this.bb = ByteBuffer.wrap(baos.toByteArray());
		//将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头
                this.bb.putInt(this.bb.capacity() - 4);
		//准备给后续读
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。

假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求

else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);

   if (k.isReadable()) {

		//先从Channel读4个字节,代表头
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {
                    throw new EndOfStreamException(
                            "Unable to read additional data from client sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely client has closed socket");
                }
		//int读好,继续往下读
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
			//2个一样,就可以继续读下一个请求了
                    if (incomingBuffer == lenBuffer) { // start of next request
                        incomingBuffer.flip();
			//给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来
                        isPayload = readLength(k);
			//clear一下,准备写
                        incomingBuffer.clear();
                    } else {
                        // continuation
                        isPayload = true;
                    }
			//好,读后续数据
                    if (isPayload) { // not the case for 4letterword
                        readPayload();
                    }
                    else {
                        // four letter words take care
                        // need not do anything else
                        return;
                    }
                }
            }

 具体的后续数据流程:

 

/** Read the request payload (everything following the length prefix) */
    private void readPayload() throws IOException, InterruptedException {
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
		//尝试一次读进来
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from client sessionid 0x"
                        + Long.toHexString(sessionId)
                        + ", likely client has closed socket");
            }
        }
	//哈哈,一次读完
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
		//server的packet统计
            packetReceived();
		//准备使用这个buffer了
            incomingBuffer.flip();
		//嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了
            if (!initialized) {
                readConnectRequest();
            } 
		//处理请他请求
	    else {
                readRequest();
            }
		//清理现场,为下一个packet读做准备
            lenBuffer.clear();
            incomingBuffer = lenBuffer;
        }
    }

 我们现在发的ConnReq已经被server端接受了,处理如下

 

 private void readConnectRequest() throws IOException, InterruptedException {
        if (zkServer == null) {
            throw new IOException("ZooKeeperServer not running");
        }
	//开始执行ConnectRequest的处理链
        zkServer.processConnectRequest(this, incomingBuffer);
	//处理完了,说明业务连接已经建立好了
        initialized = true;
    }

 具体处理:

 

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
	//ConnectReq的packet是没有header的,所以直接读内容,反序列化
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect");
        ...
        boolean readOnly = false;
        try {
		//是否readOnly
            readOnly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
          ....
        }
        ...
        //设置客户端请求的session相关参数
        int sessionTimeout = connReq.getTimeOut();
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
	//暂时先不读后续请求了,直到session建立
        cnxn.disableRecv();
	//拿客户端的sessionId
        long sessionId = connReq.getSessionId();
	//重试
        if (sessionId != 0) {
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
		//创建新Session
            createSession(cnxn, passwd, sessionTimeout);
        }
    }

 创建新session如下:

 

    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
	//server端创建session,sessionId自增
        long sessionId = sessionTracker.createSession(timeout);
	//随机密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
	//每个server端连接都有一个唯一的SessionId
        cnxn.setSessionId(sessionId);
	//提交请求给后面的执行链
        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
        return sessionId;
    }

提交过程:

private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }

  Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:

 

public void submitRequest(Request si) {
       ......
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
		//提交给后续的processor执行,一般用异步以提升性能
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
       ......
    }

 第一个processor PrepRequestProcessor执行:

 

public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take();
                ......
                pRequest(request);
            }
      ......
    }

 对于CREATE_SESSION具体处理:

 

//create/close session don't require request record
            case OpCode.createSession:
            case OpCode.closeSession:
		//在这里,组装了Request的header和txh实现,方便后续processor处理
                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                break;
		......
	request.zxid = zks.getZxid();
	//让后续processor处理,这里一般是异步以提高性能
        nextProcessor.processRequest(request);
 case OpCode.createSession:
		//读session超时值
                request.request.rewind();
                int to = request.request.getInt();
		//组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
                request.txn = new CreateSessionTxn(to);
                request.request.rewind();
                zks.sessionTracker.addSession(request.sessionId, to);
                zks.setOwner(request.sessionId, request.getOwner());
                break;

  从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理

第二个Processor SyncRequestProcessor处理:

 

int randRoll = r.nextInt(snapCount/2);
            while (true) {
                Request si = null;
		//flush队列如果为空,阻塞等待,代表之前的请求都被处理了
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } 
		//如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求
		//如果系统压力大,则可能需要flush满1000个才会继续处理
		else {
                    si = queuedRequests.poll();
			//任务queue空闲,处理积压的待flush请求
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
			//将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢
                    if (zks.getZKDatabase().append(si)) {
			//成功计数器
                        logCount++;
			//如果成功append的request累计数量大于某个值,则执行flush log的操作
			//并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint
			//snapCount默认是100000
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
			    //将内存中的log flush到磁盘
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
			    //启动线程异步将内存中的database和sessions状态写入snapshot文件中
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new Thread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    }
		    //如果是写请求,而且flush队列为空,执行往下执行 
		    else if (toFlush.isEmpty()) {
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                        continue;
                    }
		    //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
                    toFlush.add(si);
		    //如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }

 具体的flush处理:

 

private void flush(LinkedList<Request> toFlush)
        throws IOException, RequestProcessorException
    {
        if (toFlush.isEmpty())
            return;
	//将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
        zks.getZKDatabase().commit();
	//log flush完后,开始处理flush队列里的Request
        while (!toFlush.isEmpty()) {
            Request i = toFlush.remove();
		//执行后面的processor
            nextProcessor.processRequest(i);
        }
        if (nextProcessor instanceof Flushable) {
            ((Flushable)nextProcessor).flush();
        }
    }

 我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:

 

if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
		//对于事务型请求,处理之
               rc = zks.processTxn(hdr, txn);
            }

 具体处理:

 

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
	//进一步调用database来处理事务
        rc = getZKDatabase().processTxn(hdr, txn);
	//如果是创建session,添加session
        if (opCode == OpCode.createSession) {
            if (txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.addSession(sessionId, cst
                        .getTimeOut());
      ......
        return rc;
    }

 public ProcessTxnResult processTxn(TxnHeader header, Record txn)

    {
	//在这里构造一个Result对象,返回给FinalRequestProcessor
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
	......

 在FinalRequestProcessor拿到database的处理结果,继续处理:

 

case OpCode.createSession: {
                zks.serverStats().updateLatency(request.createTime);

                lastOp = "SESS";
                cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                        request.createTime, System.currentTimeMillis());
		//在这里写回response
                zks.finishSessionInit(request.cnxn, true);
                return;
            }

 public void finishSessionInit(ServerCnxn cnxn, boolean valid) {

        ......
		//构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码
            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
		//用-1占位
            bos.writeInt(-1, "len");
		//序列化内容
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(
                        this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
		//将之前的-1改成真实的长度
            bb.putInt(bb.remaining() - 4).rewind();
		//通过channel写回
            cnxn.sendBuffer(bb);    

            ......
		//打开selector的读事件
            cnxn.enableRecv();
        ......
    }

 具体写回,通讯层NIOServerCnxn:

 

public void sendBuffer(ByteBuffer bb) {
        try {
            if (bb != ServerCnxnFactory.closeConn) {
                // We check if write interest here because if it is NOT set,
                // nothing is queued, so we can try to send the buffer right
                // away without waking up the selector
		//确保可写
                if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
                    try {
			//写回client
                        sock.write(bb);
                    } catch (IOException e) {
                        // we are just doing best effort right now
                    }
                }
                // if there is nothing left to send, we are done
		//一次写完了,太好了
                if (bb.remaining() == 0) {
                    packetSent();
                    return;
                }
            }
		//如果一次没写完,添加到输出队列,后续继续写
            synchronized(this.factory){
                sk.selector().wakeup();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
                            + " is valid: " + sk.isValid());
                }
                outgoingBuffers.add(bb);
                if (sk.isValid()) {
                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
                }
            }
            
        .......
    }

 到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:

 

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
		//先读包的长度,一个int
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
		//如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
		//如果读的是长度
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
			//给incomingBuffer分配包长度的空间
                    readLength();
                } 
		//如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse		
		else if (!initialized) {
			//读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象
                    readConnectResult();
			//继续读后续响应
                    enableRead();
			//如果还有写请求,确保write事件ok
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
			//准备读下一个响应
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
			//session建立完毕
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }

 具体的读取:

 

void readConnectResult() throws IOException {
        .....
	//将incomingBuffer反序列化成CoonectResponse
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
	//server返回的sessionId
        this.sessionId = conRsp.getSessionId();
	//后续处理,初始化client的一些参数,最后触发WatchedEvent
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

 后续处理如下:

 

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            ......
		//初始化client端的session相关参数
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
		//修改CONNECT状态
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
		//触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

 EventThread处理:

 

       public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
		//EventThread同步session状态
            sessionState = event.getState();

            // materialize the watchers based on the event
		//找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
		//提交异步队列处理
            waitingEvents.add(pair);
        }

 EventThread主线程

 

public void run() {
           try {
              isRunning = true;
              while (true) {
		//拿事件
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
			//处理
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

 具体处理:

 

if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } 

 在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干

 

case SyncConnected:
				// In this particular example we don't need to do anything
				// here - watches are automatically re-registered with
				// server and any watches triggered while the client was
				// disconnected will be delivered (in order of course)
				break;
 

好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。

Session建立核心流程:

1.创建TCP连接

2.client发送ConnectRequest包

3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client

4.client收到server的响应,触发相应SyncConnected状态的事件

5.client端watcher消费事件

分享到:
评论
1 楼 灰色回忆 2016-02-19  
hi,
请教个问题,在你的博文中提到下面一句话。
我不理解怎么让同一个包读两次。
//如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用   if (!incomingBuffer.hasRemaining()) { 
                incomingBuffer.flip();

相关推荐

Global site tag (gtag.js) - Google Analytics