diff --git a/protocols/shared/src/main/scala-2/fs2/protocols/pcapng/Block.scala b/protocols/shared/src/main/scala-2/fs2/protocols/pcapng/Block.scala index b18a1b1df6..9d84f200b5 100644 --- a/protocols/shared/src/main/scala-2/fs2/protocols/pcapng/Block.scala +++ b/protocols/shared/src/main/scala-2/fs2/protocols/pcapng/Block.scala @@ -33,6 +33,7 @@ trait Block object Block { // format: off + // It's possible to use Codec[Unit :: Long :: LB] for all other than Header blocks def codec[L <: HList, LB <: HList](hexConstant: ByteVector)(f: Length => Codec[L])( implicit prepend: Prepend.Aux[L, Unit :: HNil, LB], diff --git a/protocols/shared/src/main/scala/fs2/protocols/pcapng/CaptureFile.scala b/protocols/shared/src/main/scala/fs2/protocols/pcapng/CaptureFile.scala new file mode 100644 index 0000000000..8a60d7ef37 --- /dev/null +++ b/protocols/shared/src/main/scala/fs2/protocols/pcapng/CaptureFile.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.protocols.pcapng + +import cats.effect.MonadCancelThrow +import fs2.interop.scodec.StreamDecoder +import fs2.protocols.pcap.LinkType +import fs2.timeseries.TimeStamped +import fs2.{Pipe, Pull, Stream} +import scodec.bits.ByteVector + +object CaptureFile { + + val streamDecoder: StreamDecoder[BodyBlock] = + StreamDecoder.once(SectionHeaderBlock.codec).flatMap { shb => + StreamDecoder.many(BodyBlock.decoder(shb.ordering)) + } + + def parse[F[_]: MonadCancelThrow, A]( + f: (LinkType, ByteVector) => Option[A] + ): Pipe[F, Byte, TimeStamped[A]] = { bytes => + def go( + idbs: Vector[InterfaceDescriptionBlock], + blocks: Stream[F, BodyBlock] + ): Pull[F, TimeStamped[A], Unit] = + blocks.pull.uncons1.flatMap { + case Some((idb: InterfaceDescriptionBlock, tail)) => + go(idbs :+ idb, tail) + case Some((epb: EnhancedPacketBlock, tail)) => + val idb = idbs(epb.interfaceId.toInt) + val ts = ((epb.timestampHigh << 32) | epb.timestampLow) * idb.if_tsresol + val timeStamped = f(idb.linkType, epb.packetData).map(TimeStamped(ts, _)) + Pull.outputOption1(timeStamped) >> go(idbs, tail) + case Some((_, tail)) => go(idbs, tail) + case None => Pull.done + } + + bytes + .through(streamDecoder.toPipeByte) + .through(go(Vector.empty, _).stream) + } +} diff --git a/protocols/shared/src/main/scala/fs2/protocols/pcapng/InterfaceDescriptionBlock.scala b/protocols/shared/src/main/scala/fs2/protocols/pcapng/InterfaceDescriptionBlock.scala index 2224082f60..c18f3c24d1 100644 --- a/protocols/shared/src/main/scala/fs2/protocols/pcapng/InterfaceDescriptionBlock.scala +++ b/protocols/shared/src/main/scala/fs2/protocols/pcapng/InterfaceDescriptionBlock.scala @@ -22,17 +22,24 @@ package fs2.protocols package pcapng -import pcap._ +import fs2.protocols.pcap._ import scodec.Codec import scodec.bits._ import scodec.codecs._ +import scala.concurrent.duration._ + case class InterfaceDescriptionBlock( length: Length, linkType: LinkType, snapLen: Long, bytes: ByteVector -) extends BodyBlock +) extends BodyBlock { + + // Should be extracted from options. Instead, we currently assume + // that required option wasn't found and fallback to a default value. + def if_tsresol: FiniteDuration = 1.microsecond +} object InterfaceDescriptionBlock { @@ -42,10 +49,10 @@ object InterfaceDescriptionBlock { // format: off def codec(implicit ord: ByteOrdering): Codec[InterfaceDescriptionBlock] = "IDB" | Block.codec(hexConstant) { length => - ("LinkType" | guint16.xmap[LinkType](LinkType.fromInt, LinkType.toInt)) :: - ("Reserved" | ignore(16)) :: - ("SnapLen" | guint32) :: - ("Block Bytes" | bytes(length.toLong.toInt - 20)) + ("LinkType" | guint16.xmap[LinkType](LinkType.fromInt, LinkType.toInt) ) :: + ("Reserved" | ignore(16) ) :: + ("SnapLen" | guint32 ) :: + ("Block Bytes" | bytes(length.toLong.toInt - 20) ) }.dropUnits.as[InterfaceDescriptionBlock] // format: on } diff --git a/protocols/shared/src/main/scala/fs2/protocols/pcapng/SimplePacketBlock.scala b/protocols/shared/src/main/scala/fs2/protocols/pcapng/SimplePacketBlock.scala new file mode 100644 index 0000000000..976d6fd2a6 --- /dev/null +++ b/protocols/shared/src/main/scala/fs2/protocols/pcapng/SimplePacketBlock.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.protocols.pcapng + +import scodec.Codec +import scodec.bits._ +import scodec.codecs._ + +case class SimplePacketBlock(length: Length, bytes: ByteVector) extends BodyBlock + +object SimplePacketBlock { + + private def hexConstant(implicit ord: ByteOrdering) = + orderDependent(hex"00000003", hex"03000000") + + def codec(implicit ord: ByteOrdering): Codec[ProcessInformationBlock] = + "SPB" | Block.ignoredCodec(hexConstant).as[ProcessInformationBlock] +} diff --git a/protocols/shared/src/test/scala/fs2/protocols/PcapNgExample.scala b/protocols/shared/src/test/scala/fs2/protocols/PcapNgExample.scala index 78279afb3d..1a2b442546 100644 --- a/protocols/shared/src/test/scala/fs2/protocols/PcapNgExample.scala +++ b/protocols/shared/src/test/scala/fs2/protocols/PcapNgExample.scala @@ -23,23 +23,22 @@ package fs2 package protocols import cats.effect.{IO, IOApp} -import fs2.interop.scodec.StreamDecoder -import fs2.io.file.{Files, Path} -import fs2.protocols.pcapng.{BodyBlock, DummyBlock, SectionHeaderBlock} -import scodec.Decoder import cats.syntax.all._ +import fs2.io.file.{Files, Path} +import fs2.protocols.pcap.LinkType +import fs2.protocols.pcapng.{CaptureFile, DummyBlock} object PcapNgExample extends IOApp.Simple { def run: IO[Unit] = - decode.compile.toList.flatMap(_.traverse_(IO.println)) + output.compile.toList.flatMap(x => IO.println(x.length)) // _.traverse_(IO.println) private def byteStream: Stream[IO, Byte] = Files[IO].readAll(Path("/Users/anikiforov/pcapng/many_interfaces.pcapng")) private def revealFailed = byteStream - .through(streamDecoder.toPipeByte) + .through(CaptureFile.streamDecoder.toPipeByte) .flatMap { case dummy: DummyBlock => Stream.emit(dummy) case _ => Stream.empty @@ -47,13 +46,13 @@ object PcapNgExample extends IOApp.Simple { .debug() private def decode = - byteStream.through(streamDecoder.toPipeByte) - - private val streamDecoder: StreamDecoder[BodyBlock] = - for { - hdr <- StreamDecoder.once(SectionHeaderBlock.codec) - fallback = DummyBlock.codec(hdr.ordering) - decoder = Decoder.choiceDecoder(BodyBlock.decoder(hdr.ordering), fallback) - block <- StreamDecoder.many(decoder) - } yield block + byteStream.through(CaptureFile.streamDecoder.toPipeByte) + + private def output = + byteStream.through( + CaptureFile.parse { + case (LinkType.Ethernet, bv) => bv.some + case _ => none + } + ) }