@@ -20,6 +20,7 @@ import java.time.DateTimeException
2020
2121import scala .jdk .CollectionConverters ._
2222import scala .reflect .ClassTag
23+ import scala .util .control .NonFatal
2324
2425import com .google .rpc .ErrorInfo
2526import io .grpc .{ManagedChannel , StatusRuntimeException }
@@ -59,6 +60,11 @@ private[client] class GrpcExceptionConverter(channel: ManagedChannel) extends Lo
5960 } catch {
6061 case e : StatusRuntimeException =>
6162 throw toThrowable(e, sessionId, userContext, clientType)
63+ case NonFatal (e) =>
64+ throw new SparkException (
65+ errorClass = " CONNECT_CLIENT_INTERNAL_ERROR" ,
66+ messageParameters = Map (" message" -> e.toString),
67+ cause = e)
6268 }
6369 }
6470
@@ -139,6 +145,22 @@ private[client] class GrpcExceptionConverter(channel: ManagedChannel) extends Lo
139145 clientType : String ): Throwable = {
140146 val status = StatusProto .fromThrowable(ex)
141147
148+ if (status == null ) {
149+ val statusCode = ex.getStatus.getCode
150+ val (errorClass, sqlState) = statusCode match {
151+ case io.grpc.Status .Code .PERMISSION_DENIED =>
152+ (" INSUFFICIENT_PERMISSIONS" , " 42501" )
153+ case io.grpc.Status .Code .UNAUTHENTICATED =>
154+ (" UNAUTHENTICATED" , " 08000" )
155+ case _ =>
156+ (" CONNECT_CLIENT_INTERNAL_ERROR" , " XXKCI" )
157+ }
158+ return new SparkException (
159+ errorClass = errorClass,
160+ messageParameters = Map (" message" -> ex.toString),
161+ cause = ex)
162+ }
163+
142164 // Extract the ErrorInfo from the StatusProto, if present.
143165 val errorInfoOpt = status.getDetailsList.asScala
144166 .find(_.is(classOf [ErrorInfo ]))
@@ -159,7 +181,10 @@ private[client] class GrpcExceptionConverter(channel: ManagedChannel) extends Lo
159181 }
160182
161183 // If no ErrorInfo is found, create a SparkException based on the StatusRuntimeException.
162- new SparkException (ex.toString, ex.getCause)
184+ new SparkException (
185+ errorClass = " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" ,
186+ messageParameters = Map (" message" -> ex.toString),
187+ cause = ex.getCause)
163188 }
164189}
165190
@@ -168,7 +193,6 @@ private[client] object GrpcExceptionConverter {
168193 private [client] case class ErrorParams (
169194 message : String ,
170195 cause : Option [Throwable ],
171- // errorClass will only be set if the error is SparkThrowable.
172196 errorClass : Option [String ],
173197 // messageParameters will only be set if the error is both enriched and SparkThrowable.
174198 messageParameters : Map [String , String ],
@@ -186,13 +210,13 @@ private[client] object GrpcExceptionConverter {
186210 new StreamingQueryException (
187211 params.message,
188212 params.cause.orNull,
189- params.errorClass.orNull ,
213+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
190214 params.messageParameters)),
191215 errorConstructor(params =>
192216 new ParseException (
193217 None ,
194218 Origin (),
195- errorClass = params.errorClass.orNull ,
219+ errorClass = params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
196220 messageParameters = params.messageParameters,
197221 queryContext = params.queryContext)),
198222 errorConstructor(params =>
@@ -202,26 +226,29 @@ private[client] object GrpcExceptionConverter {
202226 cause = params.cause,
203227 context = params.queryContext)),
204228 errorConstructor(params =>
205- new NamespaceAlreadyExistsException (params.errorClass.orNull, params.messageParameters)),
229+ new NamespaceAlreadyExistsException (params.errorClass.getOrElse(
230+ " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" ), params.messageParameters)),
206231 errorConstructor(params =>
207232 new TableAlreadyExistsException (
208- params.errorClass.orNull ,
233+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
209234 params.messageParameters,
210235 params.cause)),
211236 errorConstructor(params =>
212237 new TempTableAlreadyExistsException (
213- params.errorClass.orNull ,
238+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
214239 params.messageParameters,
215240 params.cause)),
216241 errorConstructor(params =>
217242 new NoSuchDatabaseException (
218- params.errorClass.orNull ,
243+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
219244 params.messageParameters,
220245 params.cause)),
221246 errorConstructor(params =>
222- new NoSuchNamespaceException (params.errorClass.orNull, params.messageParameters)),
247+ new NoSuchNamespaceException (params.errorClass.getOrElse(
248+ " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" ), params.messageParameters)),
223249 errorConstructor(params =>
224- new NoSuchTableException (params.errorClass.orNull, params.messageParameters, params.cause)),
250+ new NoSuchTableException (params.errorClass.getOrElse(
251+ " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" ), params.messageParameters, params.cause)),
225252 errorConstructor[NumberFormatException ](params =>
226253 new SparkNumberFormatException (
227254 errorClass = params.errorClass.getOrElse(" _LEGACY_ERROR_TEMP_3104" ),
@@ -255,20 +282,21 @@ private[client] object GrpcExceptionConverter {
255282 params.queryContext)),
256283 errorConstructor(params =>
257284 new SparkRuntimeException (
258- params.errorClass.orNull ,
285+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
259286 params.messageParameters,
260287 params.cause.orNull,
261288 params.queryContext)),
262289 errorConstructor(params =>
263290 new SparkUpgradeException (
264- params.errorClass.orNull ,
291+ params.errorClass.getOrElse( " CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE " ) ,
265292 params.messageParameters,
266293 params.cause.orNull)),
267294 errorConstructor(params =>
268295 new SparkException (
269296 message = params.message,
270297 cause = params.cause.orNull,
271- errorClass = params.errorClass,
298+ errorClass = params.errorClass.orElse(
299+ Option (" CONNECT_CLIENT_UNEXPECTED_MISSING_SQL_STATE" )),
272300 messageParameters = params.messageParameters,
273301 context = params.queryContext)))
274302
0 commit comments