diff --git a/build.sbt b/build.sbt
index 7ba262c6..9724561c 100644
--- a/build.sbt
+++ b/build.sbt
@@ -240,7 +240,9 @@ lazy val datasetSettings =
         mc("frameless.functions.FramelessLit"),
         mc(f"frameless.functions.FramelessLit$$"),
         dmm("frameless.functions.package.litAggr"),
-        dmm("org.apache.spark.sql.FramelessInternals.column")
+        dmm("org.apache.spark.sql.FramelessInternals.column"),
+        dmm("frameless.TypedEncoder.collectionEncoder"),
+        dmm("frameless.TypedEncoder.setEncoder")
       )
     },
     coverageExcludedPackages := "org.apache.spark.sql.reflection",
diff --git a/dataset/src/main/scala/frameless/CollectionCaster.scala b/dataset/src/main/scala/frameless/CollectionCaster.scala
new file mode 100644
index 00000000..bf329992
--- /dev/null
+++ b/dataset/src/main/scala/frameless/CollectionCaster.scala
@@ -0,0 +1,67 @@
+package frameless
+
+import frameless.TypedEncoder.CollectionConversion
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{
+  CodegenContext,
+  CodegenFallback,
+  ExprCode
+}
+import org.apache.spark.sql.catalyst.expressions.{ Expression, UnaryExpression }
+import org.apache.spark.sql.types.{ DataType, ObjectType }
+
+case class CollectionCaster[F[_], C[_], Y](
+    child: Expression,
+    conversion: CollectionConversion[F, C, Y])
+    extends UnaryExpression
+    with CodegenFallback {
+
+  protected def withNewChildInternal(newChild: Expression): Expression =
+    copy(child = newChild)
+
+  override def eval(input: InternalRow): Any = {
+    val o = child.eval(input).asInstanceOf[Object]
+    o match {
+      case col: F[Y] @unchecked =>
+        conversion.convert(col)
+      case _ => o
+    }
+  }
+
+  override def dataType: DataType = child.dataType
+}
+
+case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression)
+    extends UnaryExpression {
+
+  protected def withNewChildInternal(newChild: Expression): Expression =
+    copy(child = newChild)
+
+  // eval on interpreted works, fallback on codegen does not, e.g. with ColumnTests.asCol and Vectors, the code generated still has child of type Vector but child eval returns X2, which is not good
+  override def eval(input: InternalRow): Any = {
+    val o = child.eval(input).asInstanceOf[Object]
+    o match {
+      case col: Set[Y] @unchecked =>
+        col.toSeq
+      case _ => o
+    }
+  }
+
+  def toSeqOr[T](isSet: => T, or: => T): T =
+    child.dataType match {
+      case ObjectType(cls)
+          if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
+        isSet
+      case t => or
+    }
+
+  override def dataType: DataType =
+    toSeqOr(ObjectType(classOf[scala.collection.Seq[_]]), child.dataType)
+
+  override protected def doGenCode(
+      ctx: CodegenContext,
+      ev: ExprCode
+    ): ExprCode =
+    defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toVector()", s"$c"))
+
+}
diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala
index b42b026e..928a05d6 100644
--- a/dataset/src/main/scala/frameless/TypedEncoder.scala
+++ b/dataset/src/main/scala/frameless/TypedEncoder.scala
@@ -1,15 +1,10 @@
 package frameless
 
 import java.math.BigInteger
-
 import java.util.Date
-
-import java.time.{ Duration, Instant, Period, LocalDate }
-
+import java.time.{ Duration, Instant, LocalDate, Period }
 import java.sql.Timestamp
-
 import scala.reflect.ClassTag
-
 import org.apache.spark.sql.FramelessInternals
 import org.apache.spark.sql.FramelessInternals.UserDefinedType
 import org.apache.spark.sql.{ reflection => ScalaReflection }
@@ -22,10 +17,11 @@ import org.apache.spark.sql.catalyst.util.{
 }
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
-
 import shapeless._
 import shapeless.ops.hlist.IsHCons
 
+import scala.collection.immutable.{ ListSet, TreeSet }
+
 abstract class TypedEncoder[T](
     implicit
     val classTag: ClassTag[T])
@@ -501,10 +497,76 @@ object TypedEncoder {
       override def toString: String = s"arrayEncoder($jvmRepr)"
     }
 
-  implicit def collectionEncoder[C[X] <: Seq[X], T](
+  /**
+   * Per #804 - when MapObjects is used in interpreted mode the type returned is Seq, not the derived type used in compilation
+   *
+   * This type class offers extensible conversion for more specific types.  By default Seq, List and Vector for Seq's and Set, TreeSet and ListSet are supported.
+   *
+   * @tparam C
+   */
+  trait CollectionConversion[F[_], C[_], Y] extends Serializable {
+    def convert(c: F[Y]): C[Y]
+  }
+
+  object CollectionConversion {
+
+    implicit def seqToSeq[Y] = new CollectionConversion[Seq, Seq, Y] {
+
+      override def convert(c: Seq[Y]): Seq[Y] =
+        c match {
+          // Stream is produced
+          case _: Stream[Y] @unchecked => c.toVector.toSeq
+          case _                       => c
+        }
+    }
+
+    implicit def seqToVector[Y] = new CollectionConversion[Seq, Vector, Y] {
+      override def convert(c: Seq[Y]): Vector[Y] = c.toVector
+    }
+
+    implicit def seqToList[Y] = new CollectionConversion[Seq, List, Y] {
+      override def convert(c: Seq[Y]): List[Y] = c.toList
+    }
+
+    implicit def setToSet[Y] = new CollectionConversion[Set, Set, Y] {
+      override def convert(c: Set[Y]): Set[Y] = c
+    }
+
+    implicit def setToTreeSet[Y](
+        implicit
+        ordering: Ordering[Y]
+      ) = new CollectionConversion[Set, TreeSet, Y] {
+
+      override def convert(c: Set[Y]): TreeSet[Y] =
+        TreeSet.newBuilder.++=(c).result()
+    }
+
+    implicit def setToListSet[Y] = new CollectionConversion[Set, ListSet, Y] {
+
+      override def convert(c: Set[Y]): ListSet[Y] =
+        ListSet.newBuilder.++=(c).result()
+    }
+  }
+
+  implicit def seqEncoder[C[X] <: Seq[X], T](
+      implicit
+      i0: Lazy[RecordFieldEncoder[T]],
+      i1: ClassTag[C[T]],
+      i2: CollectionConversion[Seq, C, T]
+    ) = collectionEncoder[Seq, C, T]
+
+  implicit def setEncoder[C[X] <: Set[X], T](
+      implicit
+      i0: Lazy[RecordFieldEncoder[T]],
+      i1: ClassTag[C[T]],
+      i2: CollectionConversion[Set, C, T]
+    ) = collectionEncoder[Set, C, T]
+
+  def collectionEncoder[O[_], C[X], T](
       implicit
       i0: Lazy[RecordFieldEncoder[T]],
-      i1: ClassTag[C[T]]
+      i1: ClassTag[C[T]],
+      i2: CollectionConversion[O, C, T]
     ): TypedEncoder[C[T]] = new TypedEncoder[C[T]] {
     private lazy val encodeT = i0.value.encoder
 
@@ -521,38 +583,31 @@ object TypedEncoder {
       if (ScalaReflection.isNativeType(enc.jvmRepr)) {
         NewInstance(classOf[GenericArrayData], path :: Nil, catalystRepr)
       } else {
-        MapObjects(enc.toCatalyst, path, enc.jvmRepr, encodeT.nullable)
+        // converts to Seq, both Set and Seq handling must convert to Seq first
+        MapObjects(
+          enc.toCatalyst,
+          SeqCaster(path),
+          enc.jvmRepr,
+          encodeT.nullable
+        )
       }
     }
 
     def fromCatalyst(path: Expression): Expression =
-      MapObjects(
-        i0.value.fromCatalyst,
-        path,
-        encodeT.catalystRepr,
-        encodeT.nullable,
-        Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly
-      )
+      CollectionCaster[O, C, T](
+        MapObjects(
+          i0.value.fromCatalyst,
+          path,
+          encodeT.catalystRepr,
+          encodeT.nullable,
+          Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly when compiling
+        ),
+        implicitly[CollectionConversion[O, C, T]]
+      ) // This will convert Seq to the appropriate C[_] when eval'ing.
 
     override def toString: String = s"collectionEncoder($jvmRepr)"
   }
 
-  /**
-   * @param i1 implicit lazy `RecordFieldEncoder[T]` to encode individual elements of the set.
-   * @param i2 implicit `ClassTag[Set[T]]` to provide runtime information about the set type.
-   * @tparam T the element type of the set.
-   * @return a `TypedEncoder` instance for `Set[T]`.
-   */
-  implicit def setEncoder[T](
-      implicit
-      i1: shapeless.Lazy[RecordFieldEncoder[T]],
-      i2: ClassTag[Set[T]]
-    ): TypedEncoder[Set[T]] = {
-    implicit val inj: Injection[Set[T], Seq[T]] = Injection(_.toSeq, _.toSet)
-
-    TypedEncoder.usingInjection
-  }
-
   /**
    * @tparam A the key type
    * @tparam B the value type
diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala
index 4ebf5d93..ab1f3581 100644
--- a/dataset/src/test/scala/frameless/EncoderTests.scala
+++ b/dataset/src/test/scala/frameless/EncoderTests.scala
@@ -1,7 +1,6 @@
 package frameless
 
-import scala.collection.immutable.Set
-
+import scala.collection.immutable.{ ListSet, Set, TreeSet }
 import org.scalatest.matchers.should.Matchers
 
 object EncoderTests {
@@ -10,6 +9,8 @@ object EncoderTests {
   case class InstantRow(i: java.time.Instant)
   case class DurationRow(d: java.time.Duration)
   case class PeriodRow(p: java.time.Period)
+
+  case class ContainerOf[CC[X] <: Iterable[X]](a: CC[X1[Int]])
 }
 
 class EncoderTests extends TypedDatasetSuite with Matchers {
@@ -32,4 +33,55 @@ class EncoderTests extends TypedDatasetSuite with Matchers {
   test("It should encode java.time.Period") {
     implicitly[TypedEncoder[PeriodRow]]
   }
+
+  def performCollection[C[X] <: Iterable[X]](
+      toType: Seq[X1[Int]] => C[X1[Int]]
+    )(implicit
+      ce: TypedEncoder[C[X1[Int]]]
+    ): (Unit, Unit) = evalCodeGens {
+
+    implicit val cte = TypedExpressionEncoder[C[X1[Int]]]
+    implicit val e = implicitly[TypedEncoder[ContainerOf[C]]]
+    implicit val te = TypedExpressionEncoder[ContainerOf[C]]
+    implicit val xe = implicitly[TypedEncoder[X1[ContainerOf[C]]]]
+    implicit val xte = TypedExpressionEncoder[X1[ContainerOf[C]]]
+    val v = toType((1 to 20).map(X1(_)))
+    val ds = {
+      sqlContext.createDataset(Seq(X1[ContainerOf[C]](ContainerOf[C](v))))
+    }
+    ds.head.a.a shouldBe v
+    ()
+  }
+
+  test("It should serde a Seq of Objects") {
+    performCollection[Seq](_)
+  }
+
+  test("It should serde a Set of Objects") {
+    performCollection[Set](_)
+  }
+
+  test("It should serde a Vector of Objects") {
+    performCollection[Vector](_.toVector)
+  }
+
+  test("It should serde a TreeSet of Objects") {
+    // only needed for 2.12
+    implicit val ordering = new Ordering[X1[Int]] {
+      val intordering = implicitly[Ordering[Int]]
+
+      override def compare(x: X1[Int], y: X1[Int]): Int =
+        intordering.compare(x.a, y.a)
+    }
+
+    performCollection[TreeSet](TreeSet.newBuilder.++=(_).result())
+  }
+
+  test("It should serde a List of Objects") {
+    performCollection[List](_.toList)
+  }
+
+  test("It should serde a ListSet of Objects") {
+    performCollection[ListSet](ListSet.newBuilder.++=(_).result())
+  }
 }
diff --git a/dataset/src/test/scala/frameless/package.scala b/dataset/src/test/scala/frameless/package.scala
index 82ff375c..06b92d99 100644
--- a/dataset/src/test/scala/frameless/package.scala
+++ b/dataset/src/test/scala/frameless/package.scala
@@ -1,9 +1,12 @@
-import java.time.format.DateTimeFormatter
-import java.time.{LocalDateTime => JavaLocalDateTime}
+import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+import org.apache.spark.sql.internal.SQLConf
 
-import org.scalacheck.{Arbitrary, Gen}
+import java.time.format.DateTimeFormatter
+import java.time.{ LocalDateTime => JavaLocalDateTime }
+import org.scalacheck.{ Arbitrary, Gen }
 
 package object frameless {
+
   /** Fixed decimal point to avoid precision problems specific to Spark */
   implicit val arbBigDecimal: Arbitrary[BigDecimal] = Arbitrary {
     for {
@@ -30,11 +33,22 @@ package object frameless {
   }
 
   // see issue with scalacheck non serializable Vector: https://github.com/rickynils/scalacheck/issues/315
-  implicit def arbVector[A](implicit A: Arbitrary[A]): Arbitrary[Vector[A]] =
+  implicit def arbVector[A](
+      implicit
+      A: Arbitrary[A]
+    ): Arbitrary[Vector[A]] =
     Arbitrary(Gen.listOf(A.arbitrary).map(_.toVector))
 
   def vectorGen[A: Arbitrary]: Gen[Vector[A]] = arbVector[A].arbitrary
 
+  implicit def arbSeq[A](
+      implicit
+      A: Arbitrary[A]
+    ): Arbitrary[scala.collection.Seq[A]] =
+    Arbitrary(Gen.listOf(A.arbitrary).map(_.toVector.toSeq))
+
+  def seqGen[A: Arbitrary]: Gen[scala.collection.Seq[A]] = arbSeq[A].arbitrary
+
   implicit val arbUdtEncodedClass: Arbitrary[UdtEncodedClass] = Arbitrary {
     for {
       int <- Arbitrary.arbitrary[Int]
@@ -42,7 +56,8 @@ package object frameless {
     } yield new UdtEncodedClass(int, doubles.toArray)
   }
 
-  val dateTimeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
+  val dateTimeFormatter: DateTimeFormatter =
+    DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
 
   implicit val localDateArb: Arbitrary[JavaLocalDateTime] = Arbitrary {
     for {
@@ -72,11 +87,10 @@ package object frameless {
   def anyCauseHas(t: Throwable, f: Throwable => Boolean): Boolean =
     if (f(t))
       true
+    else if (t.getCause ne null)
+      anyCauseHas(t.getCause, f)
     else
-      if (t.getCause ne null)
-        anyCauseHas(t.getCause, f)
-      else
-        false
+      false
 
   /**
    * Runs up to maxRuns and outputs the number of failures (times thrown)
@@ -85,11 +99,11 @@ package object frameless {
    * @tparam T
    * @return the last passing thunk, or null
    */
-  def runLoads[T](maxRuns: Int = 1000)(thunk: => T): T ={
+  def runLoads[T](maxRuns: Int = 1000)(thunk: => T): T = {
     var i = 0
     var r = null.asInstanceOf[T]
     var passed = 0
-    while(i < maxRuns){
+    while (i < maxRuns) {
       i += 1
       try {
         r = thunk
@@ -98,29 +112,36 @@ package object frameless {
           println(s"run $i successful")
         }
       } catch {
-        case t: Throwable => System.err.println(s"failed unexpectedly on run $i - ${t.getMessage}")
+        case t: Throwable =>
+          System.err.println(s"failed unexpectedly on run $i - ${t.getMessage}")
       }
     }
     if (passed != maxRuns) {
-      System.err.println(s"had ${maxRuns - passed} failures out of $maxRuns runs")
+      System.err.println(
+        s"had ${maxRuns - passed} failures out of $maxRuns runs"
+      )
     }
     r
   }
 
-    /**
+  /**
    * Runs a given thunk up to maxRuns times, restarting the thunk if tolerantOf the thrown Throwable is true
    * @param tolerantOf
    * @param maxRuns default of 20
    * @param thunk
    * @return either a successful run result or the last error will be thrown
    */
-  def tolerantRun[T](tolerantOf: Throwable => Boolean, maxRuns: Int = 20)(thunk: => T): T ={
+  def tolerantRun[T](
+      tolerantOf: Throwable => Boolean,
+      maxRuns: Int = 20
+    )(thunk: => T
+    ): T = {
     var passed = false
     var i = 0
     var res: T = null.asInstanceOf[T]
     var thrown: Throwable = null
 
-    while((i < maxRuns) && !passed) {
+    while ((i < maxRuns) && !passed) {
       try {
         i += 1
         res = thunk
@@ -139,4 +160,58 @@ package object frameless {
     }
     res
   }
+
+  // from Quality, which is from Spark test versions
+
+  // if this blows then debug on CodeGenerator 1294, 1299 and grab code.body
+  def forceCodeGen[T](f: => T): T = {
+    val codegenMode = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
+
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenMode) {
+      f
+    }
+  }
+
+  def forceInterpreted[T](f: => T): T = {
+    val codegenMode = CodegenObjectFactoryMode.NO_CODEGEN.toString
+
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenMode) {
+      f
+    }
+  }
+
+  /**
+   * runs the same test with both eval and codegen, then does the same again using resolveWith
+   *
+   * @param f
+   * @tparam T
+   * @return
+   */
+  def evalCodeGens[T](f: => T): (T, T) =
+    (forceInterpreted(f), forceCodeGen(f))
+
+  /**
+   * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL
+   * configurations.
+   */
+  protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+    val conf = SQLConf.get
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map { key =>
+      if (conf.contains(key)) {
+        Some(conf.getConfString(key))
+      } else {
+        None
+      }
+    }
+    (keys, values).zipped.foreach { (k, v) => conf.setConfString(k, v) }
+    try f
+    finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None)        => conf.unsetConf(key)
+      }
+    }
+  }
+
 }