edo1z blog

プログラミングなどに関するブログです

Go言語によるビットコインのフルノード実装btcdを調べる(2)

btcdを実行した際のプログラムの流れを最初から確認してみます。

server.Start()

btcdを実行すると、設定読み込んだり、データベースの準備したりしたあとに、サーバを作成し、起動します。サーバの起動は、server.goのStart()です。そこで、s.peerHandler()とs.rebroadcastHandler()と、s.rpcServer.Start()を実行してます。

  • s.peerHandler()は、サーバーへのピアの追加と削除、ピアの禁止、ピアへのメッセージのブロードキャストなどのピア操作を処理するために使用されます。
  • s.rebroadcastHandler()は、RPCサーバーが受信したユーザーtxがブロックに含まれるまで再ブロードキャストします。
  • s.rpcServer.Start()は、RPCリスナーをスタートします。

今回は、peerと接続し、ブロックをもらうまでの流れを確認したいと思います。とりあえずpeerHandler()を確認します。

peerHandler()

peerHandler()は、s.addrManager.Start()と、s.blockManager.Start()を実行してます。

addrManager()

addrManagerは、Address Managerという意味で、peerのアドレスを管理しているようです。addrManager.Start()は、まずloadPeers()を実行し、peers.jsonから既知のアドレスを読み込んでいます。その後、addressHandler()を実行しています。addressHandler()は、定期的に新しいアドレスを保存するっぽいです。P2Pなのでアドレスは常に複数の新鮮なやつをゲットしておく必要があります。savePeers()が実際にアドレスの保存をしているようです。addressHandler()は、savePeers()をdumpAddressIntervalに規定されている間隔(デフォルトで10分)で実行しつづけています。新しいアドレスは、DNSseedからではなく、他のpeerから教えてもらうのかなと思うのですが、どうなっているのかな?savePeers()では、AddrManager structのaddrNewを読み込んでるだけぽいので、別のところで、peerが送ってきたアドレスをこれに入れているのかもしれない。

blockManager()

blockManager.Start()は、blockHandler()を実行してます。blockHandlerはブロックマネージャのメインハンドラです。それはgoroutineとして実行する必要があります。それは、ブロック(MsgBlock)メッセージがメモリデータ構造をロックする必要なく単一のスレッドによって処理されるように、ピアハンドラとは別のゴルーチンでブロックメッセージとinvメッセージを処理します。これは、ブロックマネージャがどのブロックが必要か、またどのようにフェッチを進めるべきかを制御するので重要です。ブロックマネージャが、ブロックについて俺ここまで持ってんだけどお前は?みたいなやつのメッセージを作成したり、関連するメッセージを受信した場合のその解析等を行っているぽいです。

connManager()

peerHandler()は、addrManagerとblockManager以外に、connManagerもStart()してます。まさしく接続管理みたいな名前です。connManager.Start()の後は、チャネルの受信待ち状態になっています。connManager.Start()は、btcd/connmgr/connmanager.goにあります。connManager.Start()によって、connection managerを起動し、ネットワーク接続を開始します。connManager.Start()は、とりあえず、connHandler()を実行してます。connHandlerはすべての接続関連の要求を処理します。それはgoroutineとして実行する必要があります。connection handlerは、アクティブなアウトバウンド接続のプールを維持して、ネットワークに接続したままにします。接続要求は、割り当てられたIDによって処理され、マッピングされます。

connHandler()実行後に、cm.cfg.ListenersのListener毎にcm.listenHandler(Listener)を実行します。その後、cm.NewConnReq()を実行しています。

listenHandler()

cm.listenHandler()のコードは下記です。

// listenHandler accepts incoming connections on a given listener.  It must be
// run as a goroutine.
func (cm *ConnManager) listenHandler(listener net.Listener) {
    log.Infof("Server listening on %s", listener.Addr())
    for atomic.LoadInt32(&cm.stop) == 0 {
        conn, err := listener.Accept()
        if err != nil {
            // Only log the error if not forcibly shutting down.
            if atomic.LoadInt32(&cm.stop) == 0 {
                log.Errorf("Can't accept connection: %v", err)
            }
            continue
        }
        go cm.cfg.OnAccept(conn)
    }

    cm.wg.Done()
    log.Tracef("Listener handler done for %s", listener.Addr())
}

connManagerのConfig

connection managerのConfigは、下記があります。

Listeners       []net.Listener
OnAccept        func(net.Conn)
TargetOutbound  uint32
RetryDuration   time.Duration
OnConnection    func(*ConnReq, net.Conn)
OnDisconnection func(*ConnReq)
GetNewAddress   func() (net.Addr, error)
Dial            fun(net.Addr) (net.Conn, error)
  • Listenersは、接続マネージャーが接続の所有権を受け取り、接続を受け入れるリスナーのスライスを定義します。接続が受け入れられると、接続と共にOnAcceptハンドラが呼び出されます。接続マネージャーはこれらのリスナーのネーションをとるため、接続マネージャーが停止すると閉じられます。OnAcceptフィールドも指定されていない場合、このフィールドは効果がありません。呼び出し元が着信接続を待機したくない場合は、nilになることがあります。
  • OnAcceptは、インバウンド接続が受け入れられたときに起動されるコールバックです。接続を閉じるのは発信者の責任です。接続を閉じないと、接続マネージャが終了します。接続が依然としてアクティブであると信じており、したがって最大接続限界にまだカウントしているような望ましくない副作用を有する。このフィールドには、リスナーフィールドも指定されていないと効果がありません。その場合、受け入れ可能な接続が存在しない可能性があるからです。

  • OnConnectionは、新しいアウトバウンド接続が確立されたときに起動されるコールバックです。

  • GetNewAddressは、ネットワーク接続を確立するためのアドレスを取得する方法です。 nilの場合、新しい接続は自動的に行われません。

NewConnReq()

cm.NewConnReq()のコードは下記です。

// NewConnReq creates a new connection request and connects to the
// corresponding address.
func (cm *ConnManager) NewConnReq() {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }
    if cm.cfg.GetNewAddress == nil {
        return
    }

    c := &ConnReq{}
    atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))

    addr, err := cm.cfg.GetNewAddress()
    if err != nil {
        cm.requests <- handleFailed{c, err}
        return
    }

    c.Addr = addr

    cm.Connect(c)
}

// Connect assigns an id and dials a connection to the address of the
// connection request.
func (cm *ConnManager) Connect(c *ConnReq) {
    if atomic.LoadInt32(&amp;cm.stop) != 0 {
        return
    }
    if atomic.LoadUint64(&amp;c.id) == 0 {
        atomic.StoreUint64(&amp;c.id, atomic.AddUint64(&amp;cm.connReqCount, 1))
    }
    log.Debugf("Attempting to connect to %v", c)
    conn, err := cm.cfg.Dial(c.Addr)
    if err != nil {
        cm.requests <- handleFailed{c, err}
    } else {
        cm.requests <- handleConnected{c, conn}
    }
}
新しいアドレスに接続してその結果を、cm.requestsチャネルに入れてます。requestsチャネルの受信を待ち受けているのが、connHandler()です。接続成功した場合、cm.cfg.OnConnection(connReq, msg.conn)を実行してます。

newServer()

一番最初の、btcd/btcd.goでのserver.Start()の前にserverを、newServer()で作成していますが、このnewServer()内で、上記のonConnection()やOnAccept()やGetNewAddress()は設定されています。
cmgr, err := connmgr.New(&amp;connmgr.Config{
    Listeners:      listeners,
    OnAccept:       s.inboundPeerConnected,
    RetryDuration:  connectionRetryInterval,
    TargetOutbound: uint32(targetOutbound),
    Dial:           btcdDial,
    OnConnection:   s.outboundPeerConnected,
    GetNewAddress:  newAddressFunc,
})

newAddressFunc()

newAddressFunc()のコードは下記です。
newAddressFunc = func() (net.Addr, error) {
    for tries := 0; tries < 100; tries++ {
        addr := s.addrManager.GetAddress()
        if addr == nil {
            break
        }

                // Address will not be invalid, local or unroutable
        // because addrmanager rejects those on addition.
        // Just check that we don't already have an address
        // in the same group so that we are not connecting
        // to the same network segment at the expense of
        // others.
        key := addrmgr.GroupKey(addr.NetAddress())
        if s.OutboundGroupCount(key) != 0 {
            continue
        }

        // only allow recent nodes (10mins) after we failed 30
        // times
        if tries < 30 &amp;&amp; time.Since(addr.LastAttempt()) < 10*time.Minute {
            continue
        }

        // allow nondefault ports after 50 failed tries.
        if tries < 50 &amp;&amp; fmt.Sprintf("%d", addr.NetAddress().Port) !=
            activeNetParams.DefaultPort {
            continue
        }

        addrString := addrmgr.NetAddressKey(addr.NetAddress())
        return addrStringToNetAddr(addrString)
    }

    return nil, errors.New("no valid connect address")
}

outboundPeerConnected()

s.outboundPeerConnected()のコードは下記です。
// outboundPeerConnected is invoked by the connection manager when a new
// outbound connection is established.  It initializes a new outbound server
// peer instance, associates it with the relevant state such as the connection
// request instance and the connection itself, and finally notifies the address
// manager of the attempt.
func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
    sp := newServerPeer(s, c.Permanent)
    p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
    if err != nil {
        srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
        s.connManager.Disconnect(c.ID())
    }
    sp.Peer = p
    sp.connReq = c
    sp.AssociateConnection(conn)
    go s.peerDoneHandler(sp)
    s.addrManager.Attempt(sp.NA())
}

inboundPeerConnected()

s.inboundPeerConnected()のコードは下記です。
// inboundPeerConnected is invoked by the connection manager when a new inbound
// connection is established.  It initializes a new inbound server peer
// instance, associates it with the connection, and starts a goroutine to wait
// for disconnection.
func (s *server) inboundPeerConnected(conn net.Conn) {
    sp := newServerPeer(s, false)
    sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
    sp.AssociateConnection(conn)
    go s.peerDoneHandler(sp)
}
やっと、peerパッケージが登場しました。接続できたらNewOutBoundPeer()でアウトバウンド用peerを作成しています。peerパッケージは、peerを作って接続すると勝手に握手とかしだすらしいので、次はpeer同士の握手が確認できると思います。