Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix migration warnings in scio-core #9

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ lazy val `scio-core`: Project = project
)
else Nil
},
scalacOptions ++= {
if (scalaVersion.value.startsWith("3")) Seq("-source:3.0-migration") else Nil
},
compileOrder := CompileOrder.JavaThenScala,
)
.dependsOn(
Expand Down
10 changes: 5 additions & 5 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ object ContextAndArgs {
def parse(args: Array[String]): F[Result]
}

final case class DefaultParser[T <: PipelineOptions: ClassTag] private ()
final case class DefaultParser[T <: PipelineOptions: ClassTag] private[scio] ()
extends ArgsParser[Try] {
override type ArgsType = Args

Expand Down Expand Up @@ -213,7 +213,7 @@ object ContextAndArgs {
case _ => true
}

CaseApp.detailedParseWithHelp[T](customArgs) match {
CaseApp.detailedParseWithHelp[T](customArgs.toIndexedSeq) match {
case Left(error) =>
Failure(new Exception(error.message))
case Right((_, _, help, _)) if help =>
Expand Down Expand Up @@ -245,7 +245,7 @@ object ContextAndArgs {
}
}

final case class PipelineOptionsParser[T <: PipelineOptions: ClassTag] private ()
final case class PipelineOptionsParser[T <: PipelineOptions: ClassTag] private[scio] ()
extends ArgsParser[Try] {
override type ArgsType = T

Expand Down Expand Up @@ -412,7 +412,7 @@ object ScioContext {
}
} yield s"--$str($$|=)".r

val patterns = registeredPatterns + "--help($$|=)".r
val patterns = registeredPatterns.union(Set("--help($$|=)".r))

// Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args
val (optArgs, appArgs) =
Expand Down Expand Up @@ -690,7 +690,7 @@ class ScioContext private[scio] (
BuildInfo.version,
BuildInfo.scalaVersion,
sc.optionsAs[ApplicationNameOptions].getAppName,
state.toString,
this.state.toString,
getBeamMetrics
)

Expand Down
16 changes: 9 additions & 7 deletions scio-core/src/main/scala/com/spotify/scio/coders/Coder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,20 @@ private[scio] object Ref {
def unapply[T](c: Ref[T]): Option[(String, Coder[T])] = Option((c.typeName, c.value))
}

final case class RawBeam[T] private (beam: BCoder[T]) extends Coder[T] {
final case class RawBeam[T] private[coders] (beam: BCoder[T]) extends Coder[T] {
override def toString: String = s"RawBeam($beam)"
}
final case class Beam[T] private (beam: BCoder[T]) extends Coder[T] {
final case class Beam[T] private[coders] (beam: BCoder[T]) extends Coder[T] {
override def toString: String = s"Beam($beam)"
}
final case class Fallback[T] private (ct: ClassTag[T]) extends Coder[T] {
final case class Fallback[T] private[coders] (ct: ClassTag[T]) extends Coder[T] {
override def toString: String = s"Fallback($ct)"
}
final case class Transform[A, B] private (c: Coder[A], f: BCoder[A] => Coder[B]) extends Coder[B] {
final case class Transform[A, B] private[coders] (c: Coder[A], f: BCoder[A] => Coder[B])
extends Coder[B] {
override def toString: String = s"Transform($c, $f)"
}
final case class Disjunction[T, Id] private (
final case class Disjunction[T, Id] private[coders] (
typeName: String,
idCoder: Coder[Id],
id: T => Id,
Expand All @@ -97,7 +98,7 @@ final case class Disjunction[T, Id] private (
override def toString: String = s"Disjunction($typeName, $coder)"
}

final case class Record[T] private (
final case class Record[T] private[coders] (
typeName: String,
cs: Array[(String, Coder[Any])],
construct: Seq[Any] => T,
Expand All @@ -112,7 +113,8 @@ final case class Record[T] private (
}

// KV are special in beam and need to be serialized using an instance of KvCoder.
final case class KVCoder[K, V] private (koder: Coder[K], voder: Coder[V]) extends Coder[KV[K, V]] {
final case class KVCoder[K, V] private[coders] (koder: Coder[K], voder: Coder[V])
extends Coder[KV[K, V]] {
override def toString: String = s"KVCoder($koder, $voder)"
}

Expand Down
2 changes: 1 addition & 1 deletion scio-core/src/main/scala/com/spotify/scio/io/Tap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object MaterializeTap {
new MaterializeTap(path, CoderMaterializer.beam(context, Coder[T]))
}

final case class ClosedTap[T] private (private[scio] val underlying: Tap[T]) {
final case class ClosedTap[T] private[scio] (private[scio] val underlying: Tap[T]) {

/**
* Get access to the underlying Tap. The ScioContext has to be ran before.
Expand Down
6 changes: 4 additions & 2 deletions scio-core/src/main/scala/com/spotify/scio/io/Taps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ final private class PollingTaps(private[this] val backOff: BackOff) extends Taps
import scala.concurrent.ExecutionContext.Implicits.global
Future {
val sleeper = Sleeper.DEFAULT
do {
var first = true
while (first || BackOffUtils.next(sleeper, backOff)) {
first = false
if (polls.nonEmpty) {
val tap = if (polls.size > 1) "taps" else "tap"
logger.info(s"Polling for ${polls.size} $tap")
Expand All @@ -112,7 +114,7 @@ final private class PollingTaps(private[this] val backOff: BackOff) extends Taps
}
polls = pending
}
} while (BackOffUtils.next(sleeper, backOff))
}
polls.foreach(p => p.promise.failure(new TapNotAvailableException(p.name)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object LogicalType {
Some(logicalType.underlying)
}

final case class Record[T] private (
final case class Record[T] private[schemas] (
schemas: Array[(String, Schema[Any])],
construct: Seq[Any] => T,
destruct: T => Array[Any]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object ProtobufUtil {
*/
def schemaMetadataOf[T <: Message: ClassTag]: Map[String, AnyRef] = {
import me.lyh.protobuf.generic
import me.lyh.protobuf.generic.JsonSchema
val schema = generic.Schema
.of[Message](classTag[T].asInstanceOf[ClassTag[Message]])
.toJson
Expand Down