diff --git a/dataflowengineoss/src/main/scala/io/joern/dataflowengineoss/semanticsloader/Semantics.scala b/dataflowengineoss/src/main/scala/io/joern/dataflowengineoss/semanticsloader/Semantics.scala index f658a153d571..1a67ec60c42a 100644 --- a/dataflowengineoss/src/main/scala/io/joern/dataflowengineoss/semanticsloader/Semantics.scala +++ b/dataflowengineoss/src/main/scala/io/joern/dataflowengineoss/semanticsloader/Semantics.scala @@ -2,24 +2,88 @@ package io.joern.dataflowengineoss.semanticsloader import io.shiftleft.codepropertygraph.generated.Cpg import io.shiftleft.codepropertygraph.generated.nodes.Method +import io.shiftleft.semanticcpg.language.* trait Semantics { /** Useful for `Semantics` that benefit from having some kind of internal state tailored to the current CPG. */ - def initialize(cpg: Cpg): Unit + def initialize(cpg: Cpg): Unit = {} def forMethod(method: Method): Option[FlowSemantic] + + /** Builds a new `Semantics` whose `forMethod` behaviour first lookups in `other` and only if it fails (i.e. returns + * `None`) lookups in the current one. + */ + def after(other: Semantics): Semantics = Semantics.compose(this, other) } -/** The empty Semantics */ -object NoSemantics extends Semantics { +object Semantics { - override def initialize(cpg: Cpg): Unit = {} + private def compose(first: Semantics, second: Semantics): Semantics = new Semantics { + + override def initialize(cpg: Cpg): Unit = { + second.initialize(cpg) + first.initialize(cpg) + } + + override def forMethod(method: Method): Option[FlowSemantic] = + second.forMethod(method).orElse { first.forMethod(method) } + } +} + +/** The empty Semantics, whose `forMethod` always fails, i.e. the identity under `Semantics.after`. */ +object NoSemantics extends Semantics { override def forMethod(method: Method): Option[FlowSemantic] = None } +/** The nil Semantics, whose `forMethod` always succeeds but returns the empty (nil) mapping. */ +object NilSemantics { + + /** Builds a universal nil semantics. Beware this is right-absorbing under `Semantics.after`. */ + def apply(): Semantics = new Semantics { + override def forMethod(method: Method): Option[FlowSemantic] = Some(FlowSemantic(method.fullName, List.empty)) + } + + /** Extensionally builds a nil semantics. */ + def where(methodFullNames: List[String], regex: Boolean = false): Semantics = + FullNameSemantics.fromList(methodFullNames.map { + FlowSemantic(_, List.empty, regex) + }) + + /** Intensionally builds a nil semantics. */ + def where(predicate: Method => Boolean): Semantics = new Semantics { + override def forMethod(method: Method): Option[FlowSemantic] = Option.when(predicate(method)) { + FlowSemantic(method.fullName, List.empty) + } + } +} + +/** Semantics whose mappings are: 0->0, PassThroughMapping. */ +object NoCrossTaintSemantics { + + /** Builds a universal no-cross-taint semantics. Beware this is right-absorbing under `Semantics.after`. */ + def apply(): Semantics = new Semantics { + override def forMethod(method: Method): Option[FlowSemantic] = Some( + FlowSemantic(method.fullName, List(FlowMapping(0, 0), PassThroughMapping)) + ) + } + + /** Extensionally builds a no-cross-taint semantics. */ + def where(methodFullNames: List[String], regex: Boolean = false): Semantics = + FullNameSemantics.fromList(methodFullNames.map { + FlowSemantic(_, List(FlowMapping(0, 0), PassThroughMapping), regex) + }) + + /** Intensionally builds a no-cross-taint semantics. */ + def where(predicate: Method => Boolean): Semantics = new Semantics { + override def forMethod(method: Method): Option[FlowSemantic] = Option.when(predicate(method)) { + FlowSemantic(method.fullName, List(FlowMapping(0, 0), PassThroughMapping)) + } + } +} + case class FlowSemantic(methodFullName: String, mappings: List[FlowPath] = List.empty, regex: Boolean = false) object FlowSemantic { diff --git a/joern-cli/frontends/pysrc2cpg/src/test/scala/io/joern/pysrc2cpg/dataflow/DataFlowTests.scala b/joern-cli/frontends/pysrc2cpg/src/test/scala/io/joern/pysrc2cpg/dataflow/DataFlowTests.scala index 8e787af3b133..f82fe60b8483 100644 --- a/joern-cli/frontends/pysrc2cpg/src/test/scala/io/joern/pysrc2cpg/dataflow/DataFlowTests.scala +++ b/joern-cli/frontends/pysrc2cpg/src/test/scala/io/joern/pysrc2cpg/dataflow/DataFlowTests.scala @@ -2,7 +2,14 @@ package io.joern.pysrc2cpg.dataflow import io.joern.dataflowengineoss.DefaultSemantics import io.joern.dataflowengineoss.language.toExtendedCfgNode -import io.joern.dataflowengineoss.semanticsloader.{FlowMapping, FlowSemantic, PassThroughMapping} +import io.joern.dataflowengineoss.semanticsloader.{ + FlowMapping, + FlowSemantic, + NilSemantics, + NoCrossTaintSemantics, + NoSemantics, + PassThroughMapping +} import io.joern.pysrc2cpg.PySrc2CpgFixture import io.shiftleft.codepropertygraph.generated.Cpg import io.shiftleft.codepropertygraph.generated.nodes.{Literal, Member, Method} @@ -64,7 +71,7 @@ class DataFlowTests extends PySrc2CpgFixture(withOssDataflow = true) { |a = 20 |print(foo(a)) |""".stripMargin) - .withSemantics(DefaultSemantics().plus(List(FlowSemantic("helpers.py:.foo", List())))) + .withSemantics(DefaultSemantics().after(NilSemantics.where(List("helpers.py:.foo")))) val source = cpg.literal("20").l val sink = cpg.call("print").argument(1).l val flows = sink.reachableByFlows(source).l @@ -102,7 +109,7 @@ class DataFlowTests extends PySrc2CpgFixture(withOssDataflow = true) { |from helpers import foo |print(foo(20)) |""".stripMargin) - .withSemantics(DefaultSemantics().plus(List(FlowSemantic("helpers.py:.foo", List())))) + .withSemantics(DefaultSemantics().after(NilSemantics.where(List("helpers.py:.foo")))) val source = cpg.literal("20").l val sink = cpg.call("print").argument(1).l val flows = sink.reachableByFlows(source).l @@ -141,7 +148,7 @@ class DataFlowTests extends PySrc2CpgFixture(withOssDataflow = true) { |a = 20 |print(foo(a)) |""".stripMargin) - .withSemantics(DefaultSemantics().plus(List(FlowSemantic("Test0.py:.foo", List())))) + .withSemantics(DefaultSemantics().after(NilSemantics.where(List("Test0.py:.foo")))) val source = cpg.literal("20").l val sink = cpg.call("print").argument(1).l val flows = sink.reachableByFlows(source).l @@ -865,6 +872,143 @@ class DataFlowTests extends PySrc2CpgFixture(withOssDataflow = true) { } +class DefaultSemanticsDataFlowTest1 extends PySrc2CpgFixture(withOssDataflow = true, semantics = DefaultSemantics()) { + + "DefaultSemantics cross-taints arguments to external method calls" in { + val cpg = code(""" + |import bar + |a = 1 + |bar.foo(b, Z=a) + |bar.baz(b) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List( + List(("a = 1", 3), ("bar.foo(b, Z = a)", 4), ("bar.baz(b)", 5)) + ) + } + + "DefaultSemantics taints external method call return values" in { + val cpg = code(""" + |import bar + |y = 1 + |x = bar.foo(y) + |bar.baz(x) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List( + List(("y = 1", 3), ("bar.foo(y)", 4), ("x = bar.foo(y)", 4), ("bar.baz(x)", 5)) + ) + } + +} + +class NoSemanticsDataFlowTest1 extends PySrc2CpgFixture(withOssDataflow = true, semantics = NoSemantics) { + + "NoSemantics cross-taints arguments to external method calls" in { + val cpg = code(""" + |import bar + |a = 1 + |bar.foo(b, Z=a) + |bar.baz(b) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List( + List(("a = 1", 3), ("bar.foo(b, Z = a)", 4), ("bar.baz(b)", 5)) + ) + } + + "NoSemantics taints external method call return values" in { + val cpg = code(""" + |import bar + |y = 1 + |x = bar.foo(y) + |bar.baz(x) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List( + List(("y = 1", 3), ("bar.foo(y)", 4), ("x = bar.foo(y)", 4), ("bar.baz(x)", 5)) + ) + } +} + +class NilSemanticsDataFlowTest1 + extends PySrc2CpgFixture(withOssDataflow = true, semantics = NilSemantics().after(DefaultSemantics())) { + + "NilSemantics does not cross-taint arguments to external method calls" in { + val cpg = code(""" + |import bar + |a = 1 + |bar.foo(b, Z=a) + |bar.baz(b) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs) shouldBe empty + } + + "NilSemantics does not taint external method call return values" in { + val cpg = code(""" + |import bar + |y = 1 + |x = bar.foo(y) + |bar.baz(x) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz") + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List() + } +} + +class NoCrossTaintDataFlowTest1 + extends PySrc2CpgFixture( + withOssDataflow = true, + semantics = NoCrossTaintSemantics.where(_.fullName.contains("bar.py")).after(DefaultSemantics()) + ) { + + "NoCrossTaintSemantics prevents cross-tainting arguments to external method calls" in { + val cpg = code(""" + |import bar + |a = 1 + |bar.foo(b, Z=a) + |bar.baz(b) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call("baz").argument.argumentIndex(1) + sink.reachableByFlows(source).map(flowToResultPairs) shouldBe empty + } +} + +class NoCrossTaintDataFlowTest2 + extends PySrc2CpgFixture( + withOssDataflow = true, + semantics = NoCrossTaintSemantics.where(_.fullName.contains("foo")).after(DefaultSemantics()) + ) { + + "NoCrossTaintSemantics works for specific external method call" in { + val cpg = code(""" + |import bar + |a = 1 + |bar.foo(a,b) # foo has no-cross-taint semantics, so b is not tainted by a + |bar.baz(a,c) # however, baz has default semantics, so c is tainted by a + |print(b) + |print(c) + |""".stripMargin) + val source = cpg.literal("1") + val sink = cpg.call.name("print").argument.argumentIndex(1) + // Note: it's unfortunate that `(bar.foo(a, b), 4)` still shows up in this flow. + // However, we can check that NoCrossTaintSemantics is doing its job, as otherwise + // we'd also have a `print(b)` sink. + sink.reachableByFlows(source).map(flowToResultPairs).l shouldBe List( + List(("a = 1", 3), ("bar.foo(a, b)", 4), ("bar.baz(a, c)", 5), ("print(c)", 7)) + ) + } + +} + class RegexDefinedFlowsDataFlowTests extends PySrc2CpgFixture( withOssDataflow = true,