Skip to content

Commit af67340

Browse files
author
Davies Liu
committed
sqlCtx -> sqlContext
1 parent 15a278f commit af67340

File tree

10 files changed

+64
-65
lines changed

10 files changed

+64
-65
lines changed

examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void setAge(int age) {
5555
public static void main(String[] args) throws Exception {
5656
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
5757
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
58-
SQLContext sqlCtx = new SQLContext(ctx);
58+
SQLContext sqlContext = new SQLContext(ctx);
5959

6060
System.out.println("=== Data source: RDD ===");
6161
// Load a text file and convert each line to a Java Bean.
@@ -74,11 +74,11 @@ public Person call(String line) {
7474
});
7575

7676
// Apply a schema to an RDD of Java Beans and register it as a table.
77-
DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
77+
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
7878
schemaPeople.registerTempTable("people");
7979

8080
// SQL can be run over RDDs that have been registered as tables.
81-
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
81+
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
8282

8383
// The results of SQL queries are DataFrames and support all the normal RDD operations.
8484
// The columns of a row in the result can be accessed by ordinal.
@@ -99,12 +99,12 @@ public String call(Row row) {
9999
// Read in the parquet file created above.
100100
// Parquet files are self-describing so the schema is preserved.
101101
// The result of loading a parquet file is also a DataFrame.
102-
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");
102+
DataFrame parquetFile = sqlContext.parquetFile("people.parquet");
103103

104104
//Parquet files can also be registered as tables and then used in SQL statements.
105105
parquetFile.registerTempTable("parquetFile");
106106
DataFrame teenagers2 =
107-
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
107+
sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
108108
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
109109
@Override
110110
public String call(Row row) {
@@ -120,7 +120,7 @@ public String call(Row row) {
120120
// The path can be either a single text file or a directory storing text files.
121121
String path = "examples/src/main/resources/people.json";
122122
// Create a DataFrame from the file(s) pointed by path
123-
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);
123+
DataFrame peopleFromJsonFile = sqlContext.jsonFile(path);
124124

125125
// Because the schema of a JSON dataset is automatically inferred, to write queries,
126126
// it is better to take a look at what is the schema.
@@ -133,8 +133,8 @@ public String call(Row row) {
133133
// Register this DataFrame as a table.
134134
peopleFromJsonFile.registerTempTable("people");
135135

136-
// SQL statements can be run by using the sql methods provided by sqlCtx.
137-
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
136+
// SQL statements can be run by using the sql methods provided by sqlContext.
137+
DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
138138

139139
// The results of SQL queries are DataFrame and support all the normal RDD operations.
140140
// The columns of a row in the result can be accessed by ordinal.
@@ -151,7 +151,7 @@ public String call(Row row) {
151151
List<String> jsonData = Arrays.asList(
152152
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
153153
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
154-
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
154+
DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());
155155

156156
// Take a look at the schema of this new DataFrame.
157157
peopleFromJsonRDD.printSchema();
@@ -164,7 +164,7 @@ public String call(Row row) {
164164

165165
peopleFromJsonRDD.registerTempTable("people2");
166166

167-
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
167+
DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
168168
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
169169
@Override
170170
public String call(Row row) {

examples/src/main/python/ml/simple_text_classification_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
if __name__ == "__main__":
3535
sc = SparkContext(appName="SimpleTextClassificationPipeline")
36-
sqlCtx = SQLContext(sc)
36+
sqlContext = SQLContext(sc)
3737

3838
# Prepare training documents, which are labeled.
3939
LabeledDocument = Row("id", "text", "label")

examples/src/main/python/mllib/dataset_example.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ def summarize(dataset):
4444
print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
4545
exit(-1)
4646
sc = SparkContext(appName="DatasetExample")
47-
sqlCtx = SQLContext(sc)
47+
sqlContext = SQLContext(sc)
4848
if len(sys.argv) == 2:
4949
input = sys.argv[1]
5050
else:
5151
input = "data/mllib/sample_libsvm_data.txt"
5252
points = MLUtils.loadLibSVMFile(sc, input)
53-
dataset0 = sqlCtx.inferSchema(points).setName("dataset0").cache()
53+
dataset0 = sqlContext.inferSchema(points).setName("dataset0").cache()
5454
summarize(dataset0)
5555
tempdir = tempfile.NamedTemporaryFile(delete=False).name
5656
os.unlink(tempdir)
5757
print "Save dataset as a Parquet file to %s." % tempdir
5858
dataset0.saveAsParquetFile(tempdir)
5959
print "Load it back and summarize it again."
60-
dataset1 = sqlCtx.parquetFile(tempdir).setName("dataset1").cache()
60+
dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
6161
summarize(dataset1)
6262
shutil.rmtree(tempdir)

python/pyspark/ml/classification.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ class LogisticRegressionModel(JavaModel):
9191
# The small batch size here ensures that we see multiple batches,
9292
# even in these small test examples:
9393
sc = SparkContext("local[2]", "ml.feature tests")
94-
sqlCtx = SQLContext(sc)
94+
sqlContext = SQLContext(sc)
9595
globs['sc'] = sc
96-
globs['sqlCtx'] = sqlCtx
96+
globs['sqlContext'] = sqlContext
9797
(failure_count, test_count) = doctest.testmod(
9898
globs=globs, optionflags=doctest.ELLIPSIS)
9999
sc.stop()

python/pyspark/ml/feature.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ def setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
117117
# The small batch size here ensures that we see multiple batches,
118118
# even in these small test examples:
119119
sc = SparkContext("local[2]", "ml.feature tests")
120-
sqlCtx = SQLContext(sc)
120+
sqlContext = SQLContext(sc)
121121
globs['sc'] = sc
122-
globs['sqlCtx'] = sqlCtx
122+
globs['sqlContext'] = sqlContext
123123
(failure_count, test_count) = doctest.testmod(
124124
globs=globs, optionflags=doctest.ELLIPSIS)
125125
sc.stop()

python/pyspark/shell.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
platform.python_version(),
6969
platform.python_build()[0],
7070
platform.python_build()[1]))
71-
print("SparkContext available as sc, %s available as sqlContext." % sqlCtx.__class__.__name__)
71+
print("SparkContext available as sc, %s available as sqlContext." % sqlContext.__class__.__name__)
7272

7373
if add_files is not None:
7474
print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead")

python/pyspark/sql/context.py

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
__all__ = ["SQLContext", "HiveContext", "UDFRegistration"]
3838

3939

40-
def _monkey_patch_RDD(sqlCtx):
40+
def _monkey_patch_RDD(sqlContext):
4141
def toDF(self, schema=None, sampleRatio=None):
4242
"""
4343
Converts current :class:`RDD` into a :class:`DataFrame`
4444
45-
This is a shorthand for ``sqlCtx.createDataFrame(rdd, schema, sampleRatio)``
45+
This is a shorthand for ``sqlContext.createDataFrame(rdd, schema, sampleRatio)``
4646
4747
:param schema: a StructType or list of names of columns
4848
:param samplingRatio: the sample ratio of rows used for inferring
@@ -51,7 +51,7 @@ def toDF(self, schema=None, sampleRatio=None):
5151
>>> rdd.toDF().collect()
5252
[Row(name=u'Alice', age=1)]
5353
"""
54-
return sqlCtx.createDataFrame(self, schema, sampleRatio)
54+
return sqlContext.createDataFrame(self, schema, sampleRatio)
5555

5656
RDD.toDF = toDF
5757

@@ -75,13 +75,13 @@ def __init__(self, sparkContext, sqlContext=None):
7575
"""Creates a new SQLContext.
7676
7777
>>> from datetime import datetime
78-
>>> sqlCtx = SQLContext(sc)
78+
>>> sqlContext = SQLContext(sc)
7979
>>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L,
8080
... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
8181
... time=datetime(2014, 8, 1, 14, 1, 5))])
8282
>>> df = allTypes.toDF()
8383
>>> df.registerTempTable("allTypes")
84-
>>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
84+
>>> sqlContext.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
8585
... 'from allTypes where b and i > 0').collect()
8686
[Row(c0=2, c1=2.0, c2=False, c3=2, c4=0...8, 1, 14, 1, 5), a=1)]
8787
>>> df.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time,
@@ -133,18 +133,18 @@ def registerFunction(self, name, f, returnType=StringType()):
133133
:param samplingRatio: lambda function
134134
:param returnType: a :class:`DataType` object
135135
136-
>>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x))
137-
>>> sqlCtx.sql("SELECT stringLengthString('test')").collect()
136+
>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
137+
>>> sqlContext.sql("SELECT stringLengthString('test')").collect()
138138
[Row(c0=u'4')]
139139
140140
>>> from pyspark.sql.types import IntegerType
141-
>>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
142-
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
141+
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
142+
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
143143
[Row(c0=4)]
144144
145145
>>> from pyspark.sql.types import IntegerType
146-
>>> sqlCtx.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
147-
>>> sqlCtx.sql("SELECT stringLengthInt('test')").collect()
146+
>>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
147+
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
148148
[Row(c0=4)]
149149
"""
150150
func = lambda _, it: imap(lambda x: f(*x), it)
@@ -229,38 +229,38 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
229229
:param samplingRatio: the sample ratio of rows used for inferring
230230
231231
>>> l = [('Alice', 1)]
232-
>>> sqlCtx.createDataFrame(l).collect()
232+
>>> sqlContext.createDataFrame(l).collect()
233233
[Row(_1=u'Alice', _2=1)]
234-
>>> sqlCtx.createDataFrame(l, ['name', 'age']).collect()
234+
>>> sqlContext.createDataFrame(l, ['name', 'age']).collect()
235235
[Row(name=u'Alice', age=1)]
236236
237237
>>> d = [{'name': 'Alice', 'age': 1}]
238-
>>> sqlCtx.createDataFrame(d).collect()
238+
>>> sqlContext.createDataFrame(d).collect()
239239
[Row(age=1, name=u'Alice')]
240240
241241
>>> rdd = sc.parallelize(l)
242-
>>> sqlCtx.createDataFrame(rdd).collect()
242+
>>> sqlContext.createDataFrame(rdd).collect()
243243
[Row(_1=u'Alice', _2=1)]
244-
>>> df = sqlCtx.createDataFrame(rdd, ['name', 'age'])
244+
>>> df = sqlContext.createDataFrame(rdd, ['name', 'age'])
245245
>>> df.collect()
246246
[Row(name=u'Alice', age=1)]
247247
248248
>>> from pyspark.sql import Row
249249
>>> Person = Row('name', 'age')
250250
>>> person = rdd.map(lambda r: Person(*r))
251-
>>> df2 = sqlCtx.createDataFrame(person)
251+
>>> df2 = sqlContext.createDataFrame(person)
252252
>>> df2.collect()
253253
[Row(name=u'Alice', age=1)]
254254
255255
>>> from pyspark.sql.types import *
256256
>>> schema = StructType([
257257
... StructField("name", StringType(), True),
258258
... StructField("age", IntegerType(), True)])
259-
>>> df3 = sqlCtx.createDataFrame(rdd, schema)
259+
>>> df3 = sqlContext.createDataFrame(rdd, schema)
260260
>>> df3.collect()
261261
[Row(name=u'Alice', age=1)]
262262
263-
>>> sqlCtx.createDataFrame(df.toPandas()).collect() # doctest: +SKIP
263+
>>> sqlContext.createDataFrame(df.toPandas()).collect() # doctest: +SKIP
264264
[Row(name=u'Alice', age=1)]
265265
"""
266266
if isinstance(data, DataFrame):
@@ -316,7 +316,7 @@ def registerDataFrameAsTable(self, df, tableName):
316316
317317
Temporary tables exist only during the lifetime of this instance of :class:`SQLContext`.
318318
319-
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
319+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
320320
"""
321321
if (df.__class__ is DataFrame):
322322
self._ssql_ctx.registerDataFrameAsTable(df._jdf, tableName)
@@ -330,7 +330,7 @@ def parquetFile(self, *paths):
330330
>>> parquetFile = tempfile.mkdtemp()
331331
>>> shutil.rmtree(parquetFile)
332332
>>> df.saveAsParquetFile(parquetFile)
333-
>>> df2 = sqlCtx.parquetFile(parquetFile)
333+
>>> df2 = sqlContext.parquetFile(parquetFile)
334334
>>> sorted(df.collect()) == sorted(df2.collect())
335335
True
336336
"""
@@ -352,7 +352,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
352352
>>> shutil.rmtree(jsonFile)
353353
>>> with open(jsonFile, 'w') as f:
354354
... f.writelines(jsonStrings)
355-
>>> df1 = sqlCtx.jsonFile(jsonFile)
355+
>>> df1 = sqlContext.jsonFile(jsonFile)
356356
>>> df1.printSchema()
357357
root
358358
|-- field1: long (nullable = true)
@@ -365,7 +365,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
365365
... StructField("field2", StringType()),
366366
... StructField("field3",
367367
... StructType([StructField("field5", ArrayType(IntegerType()))]))])
368-
>>> df2 = sqlCtx.jsonFile(jsonFile, schema)
368+
>>> df2 = sqlContext.jsonFile(jsonFile, schema)
369369
>>> df2.printSchema()
370370
root
371371
|-- field2: string (nullable = true)
@@ -386,11 +386,11 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
386386
If the schema is provided, applies the given schema to this JSON dataset.
387387
Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
388388
389-
>>> df1 = sqlCtx.jsonRDD(json)
389+
>>> df1 = sqlContext.jsonRDD(json)
390390
>>> df1.first()
391391
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
392392
393-
>>> df2 = sqlCtx.jsonRDD(json, df1.schema)
393+
>>> df2 = sqlContext.jsonRDD(json, df1.schema)
394394
>>> df2.first()
395395
Row(field1=1, field2=u'row1', field3=Row(field4=11, field5=None), field6=None)
396396
@@ -400,7 +400,7 @@ def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
400400
... StructField("field3",
401401
... StructType([StructField("field5", ArrayType(IntegerType()))]))
402402
... ])
403-
>>> df3 = sqlCtx.jsonRDD(json, schema)
403+
>>> df3 = sqlContext.jsonRDD(json, schema)
404404
>>> df3.first()
405405
Row(field2=u'row1', field3=Row(field5=None))
406406
"""
@@ -480,8 +480,8 @@ def createExternalTable(self, tableName, path=None, source=None,
480480
def sql(self, sqlQuery):
481481
"""Returns a :class:`DataFrame` representing the result of the given query.
482482
483-
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
484-
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
483+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
484+
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
485485
>>> df2.collect()
486486
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
487487
"""
@@ -490,8 +490,8 @@ def sql(self, sqlQuery):
490490
def table(self, tableName):
491491
"""Returns the specified table as a :class:`DataFrame`.
492492
493-
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
494-
>>> df2 = sqlCtx.table("table1")
493+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
494+
>>> df2 = sqlContext.table("table1")
495495
>>> sorted(df.collect()) == sorted(df2.collect())
496496
True
497497
"""
@@ -505,8 +505,8 @@ def tables(self, dbName=None):
505505
The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
506506
(a column with :class:`BooleanType` indicating if a table is a temporary one or not).
507507
508-
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
509-
>>> df2 = sqlCtx.tables()
508+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
509+
>>> df2 = sqlContext.tables()
510510
>>> df2.filter("tableName = 'table1'").first()
511511
Row(tableName=u'table1', isTemporary=True)
512512
"""
@@ -520,10 +520,10 @@ def tableNames(self, dbName=None):
520520
521521
If ``dbName`` is not specified, the current database will be used.
522522
523-
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
524-
>>> "table1" in sqlCtx.tableNames()
523+
>>> sqlContext.registerDataFrameAsTable(df, "table1")
524+
>>> "table1" in sqlContext.tableNames()
525525
True
526-
>>> "table1" in sqlCtx.tableNames("db")
526+
>>> "table1" in sqlContext.tableNames("db")
527527
True
528528
"""
529529
if dbName is None:
@@ -578,11 +578,11 @@ def _get_hive_ctx(self):
578578
class UDFRegistration(object):
579579
"""Wrapper for user-defined function registration."""
580580

581-
def __init__(self, sqlCtx):
582-
self.sqlCtx = sqlCtx
581+
def __init__(self, sqlContext):
582+
self.sqlContext = sqlContext
583583

584584
def register(self, name, f, returnType=StringType()):
585-
return self.sqlCtx.registerFunction(name, f, returnType)
585+
return self.sqlContext.registerFunction(name, f, returnType)
586586

587587
register.__doc__ = SQLContext.registerFunction.__doc__
588588

@@ -595,13 +595,12 @@ def _test():
595595
globs = pyspark.sql.context.__dict__.copy()
596596
sc = SparkContext('local[4]', 'PythonTest')
597597
globs['sc'] = sc
598-
globs['sqlCtx'] = sqlCtx = SQLContext(sc)
598+
globs['sqlContext'] = SQLContext(sc)
599599
globs['rdd'] = rdd = sc.parallelize(
600600
[Row(field1=1, field2="row1"),
601601
Row(field1=2, field2="row2"),
602602
Row(field1=3, field2="row3")]
603603
)
604-
_monkey_patch_RDD(sqlCtx)
605604
globs['df'] = rdd.toDF()
606605
jsonStrings = [
607606
'{"field1": 1, "field2": "row1", "field3":{"field4":11}}',

0 commit comments

Comments
 (0)