Bitcoin の P2P に Scala で通信してみる(前編)

お知らせ
Table of Contents

bitcoind は RPC で一通りの情報が取れますが、そこをちょっと踏み込んで Scala で P2P を実装して Bitcoin の Peer のフリをしてみようと思います。
細かくやると大変なので 「とりあえず繋がる」 を目標にします。

P2P のプロトコルについては公式ドキュメントなどでちゃんと解説があるので、何となく分かった気にはなれますが、「で、実際どうすんの?」っていうところで二の足を踏んでいる人に少しでもお役に立てればと思います。

Scala やそのライブラリについての深い解説はしませんので、あしからず。

以下の手順で進めます。

  1. 接続してバイト列をやりとりする
  2. バイト列をパースする (← 前編ではここまで)
  3. バージョン番号などのハンドシェイク

接続してバイト列をやりとりする

Scala ではとりあえずメジャーな Akka Streams を使います。
いろいろな組み合わせが出来すぎてなんだか分かりにくいライブラリですが、使い所を押さえれば便利で頼もしいライブラリです。
ここでは現行バージョンの 2.6.0 を使います。

準備として必要なパッケージをインポートしておきます。

import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.duration._

入力と出力の形態を定義

bitcoind からの送信を延々とバイト列として受け取って、こちらからも延々とバイト列を送りたいので、Sink(入力) と Source(出力) を以下のように定義します。
※ このネーミングが分かり難い。Source は 「接続先に提供するソース」 って意味ですかね。

val timeout = 10 seconds // 適宜調整
val sink = StreamConverters.asInputStream(timeout)

val queueSize = 8 // 適宜調整
val source = Source.queue[ByteString](queueSize, OverflowStrategy.dropNew)

OverflowStrategy は処理の適性で選択するのですが、dropNew が分かり易いと思います。

接続

用意した SinkSouce とを組み合わせて接続します。
この時 ActorSystem が必要になりますので、適当(切)なヤツを作成しておきます。通常はデフォルト設定で充分です。
Akka 2.5 まではここで Materializer が必須でしたが、2.6 からはデフォルトが用意されるようになりました。

implicit val actorSystem = akka.actor.ActorSystem("sample")

val host: String = ...
val port: Int = ...
val tcp = Tcp().outgoingConnection(host, port)
val (queue, ins) = Flow[ByteString].runWith(source via tcp, sink)

これで接続完了です。
出来上がった以下の2つを使って通信します。

  • queue: SourceQueueWithComplete[ByteString]
    offer というメソッドで ByteString を接続先に送ってくれます。
  • ins: java.io.InputStream
    接続先から送られてきたバイト列を読み込んでいくために使います。

InputStream を Stream に変換

場合によっては java.io.InputStream のままでもいいのですが、Scala での使い易さを考えて scala.collection.immutable.Stream に変換します。これにより無限長のバイト配列として扱うことができます。

val bufIns = new java.io.BufferedInputStream(ins)
val infBytes = Stream.continually(bufIns.read()).takeWhile(_ != -1).map(_.toByte)

readメソッドにより1バイトずつ読み込んでいるので一見非効率に見えますが BufferedInputStream が内部バッファにため込んでくれているので大丈夫です。コンストラクタの第2引数を省略すればメモリサイズに応じたバッファを作ってくれます。気になる人は自分の運用に応じた値を指定して下さい。

バイト列をパースする

ここからが本番です。
受け取ったバイト列をプロトコルに従って解析し意味のある構造にします。

こちらのページが詳しいです。 https://en.bitcoin.it/wiki/Protocol_documentation

パケットの構造

サイズ 名前 説明
4 magic UInt32 各P2Pネットワークで定まっている値です。 Bitcoin の testnet3 なら 0x0709110B です。
12 command String payload に入っているメッセージの種別名です。 12バイトよりも短い場合は 0x00 で埋められます。
4 length UInt32 payload の長さです。
4 checksum UInt32 payload の double-sha256 の先頭 4 バイトです。
可変 payload ByteString メッセージ本体です。 command にある種別名に従って解析します。

Bitcoin なので(?) UInt32 は Little Endian です。magic は "0B 11 09 07" となり、length が300(0x12C)バイトなら "2C 01 00 00" となります。ややこしいのは checksum で反転の反転になるのか double-sha256 のバイト列の先頭4バイトがそのままセットされます。

これによると、パケットを受け取ったら

  1. まず先頭の24バイト(4+12+4+4)を解析し
  2. magic をチェックした後で
  3. length の長さ分だけ続きを読み込み
  4. その double-sha256 を checksum でチェックし
  5. command の種別名で解析

すれば良いことが分かります。

パーサの作成

準備

関数型でパースと言えば、

ByteString => Option[(A, ByteString)]

つまり、ByteString を受け取って、型Aとしてパース出来れば、A と残りの ByteString の組を返す、というのが定石だと思います。
ここではこれに従ってパース用のメソッドを作っていきます。パーサは ParseByteString を実装することにします。

trait ParseByteString[A] {
  def parse(bs: ByteString): Option[(A, ByteString)]
}

次に ByteString のユーティリティメソッドのために以下の implicit class を任意の package object 内に作ります。
もちろんこのパッケージをインポートするのを忘れずに!

implicit class ByteStringUtil(private val bs: ByteString) extends AnyVal {
  // ここにユーティリティメソッドを追加
}

まず定義するのは固定長のパーツの読み込みです。flen長の ByteStringA に変換します。

def parseFixed[A](len: Int)(f: ByteString => A): Option[(A, ByteString)] = {
  val (a, b) = bs.splitAt(len)
  if (a.size < len) None else Some(f(a) -> b)
}

double-sha256 も hash256 として定義しておきます。

def hash256 = {
  def sha256(b: Array[Byte]) = java.security.MessageDigest.getInstance("SHA-256").digest(b)
  ByteString(sha256(sha256(bs.toArray)))
}

case class の用意

パケットの構造を表現するための case class を定義していきます。
各クラスはバイト列にもできるように、以下の CanBeByteString を実装することとします。

trait CanBeByteString {
  def toByteString: ByteString
  protected def concat(xs: CanBeByteString*): ByteString =
    xs.foldLeft(new akka.util.ByteStringBuilder)(_ append _.toByteString).result
}

まずはヘッダに使われているパーツです。ズラズラと長いですが、よく見ると単調なことに気付くと思います。

case class Magic(toByteString: ByteString) extends CanBeByteString
object Magic extends ParseByteString[Magic] {
  val length = 4
  def parse(bs: ByteString) = bs.parseFixed(length)(Magic(_))
  val testnet3 = Magic(ByteString.fromInts(0x0B, 0x11, 0x09, 0x07))
}
case class Checksum(toByteString: ByteString) extends CanBeByteString
object Checksum extends ParseByteString[Checksum] {
  val length = 4
  def parse(bs: ByteString) = bs.parseFixed(length)(Checksum(_))
  def calc(bs: ByteString) = Checksum(bs.hash256.take(length))
}
case class BSLength(toByteString: ByteString) extends CanBeByteString {
  lazy val value = BigInt(toByteString.reverse.toArray)
}
object BSLength extends ParseByteString[BSLength] {
  val length = 4
  def parse(bs: ByteString) = bs.parseFixed(length)(BSLength(_))
  def bySizeOf(bs: ByteString) = {
    val v = BigInt(bs.size)
    val array = v.toByteArray.reverse.padTo(length, 0.toByte).take(length)
    BSLength(ByteString(array))
  }
}
case class CommandName(toByteString: ByteString) extends CanBeByteString {
  lazy val string = toByteString.utf8String.trim
  override def toString = string
}
object CommandName extends ParseByteString[CommandName] {
  val length = 12
  def parse(bs: ByteString) = bs.parseFixed(length)(CommandName(_))
  def apply(s: String): CommandName = {
    val array = s.getBytes.padTo(length, 0.toByte).take(length)
    CommandName(ByteString(array))
  }
}

これらのパーツを組み合わせてヘッダを作ります。

case class Header(
  magic: Magic,
  command: CommandName,
  length: BSLength,
  checksum: Checksum
) extends CanBeByteString {
  lazy val toByteString = concat(magic, command, length, checksum)
}
object Header extends ParseByteString[Header] {
  val length = Magic.length + CommandName.length + UInt32.length + Checksum.length

  def parse(bs: ByteString) = for {
    (magic, next) <- Magic.parse(bs)
    (command, next) <- CommandName.parse(next)
    (length, next) <- BSLength.parse(next)
    (checksum, next) <- Checksum.parse(next)
  } yield Header(magic, command, length, checksum) -> next
}

パケットの解析

最後にヘッダをチェックしながらメッセージを読み込みます。
PacketStream[Byte] をパースするので、ParseByteString とは違います。

Stream は無限長なので、ここに出てくる nextByteString にしようとするとずーーっと終わらないので気を付けて下さい。

case class Packet(
  header: Header,
  body: Message
) extends CanBeByteString {
  def toByteString = concat(header, body)
}
object Packet {
  private def splitStream(len: Int)(s: Stream[Byte]): Option[(ByteString, Stream[Byte])] = {
    val (a, b) = s.splitAt(len)
    val bs = ByteString(a.toArray)
    if (bs.size < len) None else Some(bs -> b)
  }
  def parseStream(stream: Stream[Byte])(implicit magic: Magic): Option[(Packet, Stream[Byte])]  = for {
    (headerBs, next) <- splitStream(Header.length)(stream)
    (header, left) <- Header.parse(headerBs)
    if left.isEmpty
    if header.magic == magic
    (payload, next) <- splitStream(header.length.value.toInt)(next)
    if header.checksum == Checksum.calc(payload)
    mp <- Message.getParser(header.command.toString)
    (body, left) <- mp.parse(payload)
    if left.isEmpty
  } yield Packet(header, body) -> next
}

left.isEmpty のチェックは、残らず消費していることを確認しています。

Message は以下のように定義されているものとします。

trait Message extends CanBeByteString {
  val name: String
}
object Message {
  def getParser(name: String): Option[ParseByteString[_ <: Message]] = ???
}

パケットの作成

パケットの受信が出来るようになりましたので、次は送信のために Packet を作成するメソッドをコンパニオンオブジェクトに定義します。
作成された PackettoByteString を、接続している queueoffer するわけです。

def apply(msg: Message)(implicit magic: Magic): Packet = {
  val payload = msg.toByteString
  val header = Header(
    magic = magic,
    command = CommandName(msg.name)
    length = BSLength.bySizeOf(payload),
    checksum = Checksum.calc(payload)
  )
  Packet(header, msg)
}

ここまでで、P2P のパケットを送受信する事が出来るようになりました。
後編では実際に Message を定義してやりとりしながら、ハンドシェイクをしていきます。

タイトルとURLをコピーしました