Skip to content

Commit

Permalink
pcapng - moving towards comman CaptureFile
Browse files Browse the repository at this point in the history
  • Loading branch information
nikiforo committed Aug 12, 2022
1 parent d9d24ed commit c03372a
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ trait Block
object Block {

// format: off
// It's possible to use Codec[Unit :: Long :: LB] for all other than Header blocks
def codec[L <: HList, LB <: HList](hexConstant: ByteVector)(f: Length => Codec[L])(
implicit
prepend: Prepend.Aux[L, Unit :: HNil, LB],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.protocols.pcapng

import cats.effect.MonadCancelThrow
import fs2.interop.scodec.StreamDecoder
import fs2.protocols.pcap.LinkType
import fs2.timeseries.TimeStamped
import fs2.{Pipe, Pull, Stream}
import scodec.bits.ByteVector

object CaptureFile {

val streamDecoder: StreamDecoder[BodyBlock] =
StreamDecoder.once(SectionHeaderBlock.codec).flatMap { shb =>
StreamDecoder.many(BodyBlock.decoder(shb.ordering))
}

def parse[F[_]: MonadCancelThrow, A](
f: (LinkType, ByteVector) => Option[A]
): Pipe[F, Byte, TimeStamped[A]] = { bytes =>
def go(
idbs: Vector[InterfaceDescriptionBlock],
blocks: Stream[F, BodyBlock]
): Pull[F, TimeStamped[A], Unit] =
blocks.pull.uncons1.flatMap {
case Some((idb: InterfaceDescriptionBlock, tail)) =>
go(idbs :+ idb, tail)
case Some((epb: EnhancedPacketBlock, tail)) =>
val idb = idbs(epb.interfaceId.toInt)
val ts = ((epb.timestampHigh << 32) | epb.timestampLow) * idb.if_tsresol
val timeStamped = f(idb.linkType, epb.packetData).map(TimeStamped(ts, _))
Pull.outputOption1(timeStamped) >> go(idbs, tail)
case Some((_, tail)) => go(idbs, tail)
case None => Pull.done
}

bytes
.through(streamDecoder.toPipeByte)
.through(go(Vector.empty, _).stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@
package fs2.protocols
package pcapng

import pcap._
import fs2.protocols.pcap._
import scodec.Codec
import scodec.bits._
import scodec.codecs._

import scala.concurrent.duration._

case class InterfaceDescriptionBlock(
length: Length,
linkType: LinkType,
snapLen: Long,
bytes: ByteVector
) extends BodyBlock
) extends BodyBlock {

// Should be extracted from options. Instead, we currently assume
// that required option wasn't found and fallback to a default value.
def if_tsresol: FiniteDuration = 1.microsecond
}

object InterfaceDescriptionBlock {

Expand All @@ -42,10 +49,10 @@ object InterfaceDescriptionBlock {
// format: off
def codec(implicit ord: ByteOrdering): Codec[InterfaceDescriptionBlock] =
"IDB" | Block.codec(hexConstant) { length =>
("LinkType" | guint16.xmap[LinkType](LinkType.fromInt, LinkType.toInt)) ::
("Reserved" | ignore(16)) ::
("SnapLen" | guint32) ::
("Block Bytes" | bytes(length.toLong.toInt - 20))
("LinkType" | guint16.xmap[LinkType](LinkType.fromInt, LinkType.toInt) ) ::
("Reserved" | ignore(16) ) ::
("SnapLen" | guint32 ) ::
("Block Bytes" | bytes(length.toLong.toInt - 20) )
}.dropUnits.as[InterfaceDescriptionBlock]
// format: on
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.protocols.pcapng

import scodec.Codec
import scodec.bits._
import scodec.codecs._

case class SimplePacketBlock(length: Length, bytes: ByteVector) extends BodyBlock

object SimplePacketBlock {

private def hexConstant(implicit ord: ByteOrdering) =
orderDependent(hex"00000003", hex"03000000")

def codec(implicit ord: ByteOrdering): Codec[ProcessInformationBlock] =
"SPB" | Block.ignoredCodec(hexConstant).as[ProcessInformationBlock]
}
29 changes: 14 additions & 15 deletions protocols/shared/src/test/scala/fs2/protocols/PcapNgExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,36 @@ package fs2
package protocols

import cats.effect.{IO, IOApp}
import fs2.interop.scodec.StreamDecoder
import fs2.io.file.{Files, Path}
import fs2.protocols.pcapng.{BodyBlock, DummyBlock, SectionHeaderBlock}
import scodec.Decoder
import cats.syntax.all._
import fs2.io.file.{Files, Path}
import fs2.protocols.pcap.LinkType
import fs2.protocols.pcapng.{CaptureFile, DummyBlock}

object PcapNgExample extends IOApp.Simple {

def run: IO[Unit] =
decode.compile.toList.flatMap(_.traverse_(IO.println))
output.compile.toList.flatMap(x => IO.println(x.length)) // _.traverse_(IO.println)

private def byteStream: Stream[IO, Byte] =
Files[IO].readAll(Path("/Users/anikiforov/pcapng/many_interfaces.pcapng"))

private def revealFailed =
byteStream
.through(streamDecoder.toPipeByte)
.through(CaptureFile.streamDecoder.toPipeByte)
.flatMap {
case dummy: DummyBlock => Stream.emit(dummy)
case _ => Stream.empty
}
.debug()

private def decode =
byteStream.through(streamDecoder.toPipeByte)

private val streamDecoder: StreamDecoder[BodyBlock] =
for {
hdr <- StreamDecoder.once(SectionHeaderBlock.codec)
fallback = DummyBlock.codec(hdr.ordering)
decoder = Decoder.choiceDecoder(BodyBlock.decoder(hdr.ordering), fallback)
block <- StreamDecoder.many(decoder)
} yield block
byteStream.through(CaptureFile.streamDecoder.toPipeByte)

private def output =
byteStream.through(
CaptureFile.parse {
case (LinkType.Ethernet, bv) => bv.some
case _ => none
}
)
}

0 comments on commit c03372a

Please sign in to comment.