Skip to content

Commit f54338a

Browse files
committed
Make build flake8 clean and add flake8 precheck to .travis.yml
1 parent d4baeea commit f54338a

25 files changed

+277
-247
lines changed

.travis.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,8 @@ before_install:
1616
- pip install --upgrade pip
1717
- pip install pytest-timeout
1818
install:
19+
- pip install flake8
1920
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" . .[avro]
20-
script: py.test -v --timeout 20 --ignore=tmp-build --import-mode append
21+
script:
22+
- flake8
23+
- py.test -v --timeout 20 --ignore=tmp-build --import-mode append

confluent_kafka/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__all__ = ['cimpl', 'avro', 'kafkatest']
2-
from .cimpl import (Consumer,
2+
from .cimpl import (Consumer, # noqa
33
KafkaError,
44
KafkaException,
55
Message,

confluent_kafka/avro/__init__.py

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,58 +2,12 @@
22
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
33
44
"""
5-
import sys
65

76
from confluent_kafka import Producer, Consumer
8-
9-
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
10-
11-
12-
def loads(schema_str):
13-
""" Parse a schema given a schema string """
14-
if sys.version_info[0] < 3:
15-
return schema.parse(schema_str)
16-
else:
17-
return schema.Parse(schema_str)
18-
19-
20-
def load(fp):
21-
""" Parse a schema from a file path """
22-
with open(fp) as f:
23-
return loads(f.read())
24-
25-
26-
# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix
27-
def _hash_func(self):
28-
return hash(str(self))
29-
30-
31-
try:
32-
from avro import schema
33-
34-
schema.RecordSchema.__hash__ = _hash_func
35-
schema.PrimitiveSchema.__hash__ = _hash_func
36-
except ImportError:
37-
pass
38-
39-
40-
class ClientError(Exception):
41-
""" Error thrown by Schema Registry clients """
42-
43-
def __init__(self, message, http_code=None):
44-
self.message = message
45-
self.http_code = http_code
46-
super(ClientError, self).__init__(self.__str__())
47-
48-
def __repr__(self):
49-
return "ClientError(error={error})".format(error=self.message)
50-
51-
def __str__(self):
52-
return self.message
53-
54-
7+
from confluent_kafka.avro.error import ClientError
8+
from confluent_kafka.avro.load import load, loads # noqa
559
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
56-
from confluent_kafka.avro.serializer import (SerializerError,
10+
from confluent_kafka.avro.serializer import (SerializerError, # noqa
5711
KeySerializerError,
5812
ValueSerializerError)
5913
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
@@ -117,7 +71,6 @@ def produce(self, **kwargs):
11771
else:
11872
raise KeySerializerError("Avro schema required for key")
11973

120-
12174
super(AvroProducer, self).produce(topic, value, key, **kwargs)
12275

12376

confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525

2626
import requests
2727

28-
from . import ClientError, VALID_LEVELS
28+
from .error import ClientError
2929
from . import loads
3030

31+
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
32+
3133
# Common accept header sent
3234
ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
3335
log = logging.getLogger(__name__)
@@ -119,7 +121,7 @@ def register(self, subject, avro_schema):
119121

120122
schemas_to_id = self.subject_to_schema_ids[subject]
121123
schema_id = schemas_to_id.get(avro_schema, None)
122-
if schema_id != None:
124+
if schema_id is not None:
123125
return schema_id
124126
# send it up
125127
url = '/'.join([self.url, 'subjects', subject, 'versions'])
@@ -222,7 +224,7 @@ def get_version(self, subject, avro_schema):
222224
"""
223225
schemas_to_version = self.subject_to_schema_versions[subject]
224226
version = schemas_to_version.get(avro_schema, None)
225-
if version != None:
227+
if version is not None:
226228
return version
227229

228230
url = '/'.join([self.url, 'subjects', subject])

confluent_kafka/avro/error.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2017 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
19+
class ClientError(Exception):
20+
""" Error thrown by Schema Registry clients """
21+
22+
def __init__(self, message, http_code=None):
23+
self.message = message
24+
self.http_code = http_code
25+
super(ClientError, self).__init__(self.__str__())
26+
27+
def __repr__(self):
28+
return "ClientError(error={error})".format(error=self.message)
29+
30+
def __str__(self):
31+
return self.message

confluent_kafka/avro/load.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2017 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import sys
19+
20+
21+
def loads(schema_str):
22+
""" Parse a schema given a schema string """
23+
if sys.version_info[0] < 3:
24+
return schema.parse(schema_str)
25+
else:
26+
return schema.Parse(schema_str)
27+
28+
29+
def load(fp):
30+
""" Parse a schema from a file path """
31+
with open(fp) as f:
32+
return loads(f.read())
33+
34+
35+
# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitly as
36+
# a quick fix
37+
def _hash_func(self):
38+
return hash(str(self))
39+
40+
41+
try:
42+
from avro import schema
43+
44+
schema.RecordSchema.__hash__ = _hash_func
45+
schema.PrimitiveSchema.__hash__ = _hash_func
46+
except ImportError:
47+
schema = None

confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,9 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False):
125125
if not schema:
126126
raise serialize_err("Schema does not exist")
127127
self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
128-
except ClientError as e:
128+
except ClientError:
129129
exc_type, exc_value, exc_traceback = sys.exc_info()
130-
raise serialize_err( + repr(
131-
traceback.format_exception(exc_type, exc_value, exc_traceback)))
130+
raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
132131

133132
# get the writer
134133
writer = self.id_to_writers[schema_id]
@@ -171,14 +170,15 @@ def _get_decoder_func(self, schema_id, payload):
171170
# try to use fast avro
172171
try:
173172
schema_dict = schema.to_json()
174-
obj = read_data(payload, schema_dict)
175-
# here means we passed so this is something fastavro can do
176-
# seek back since it will be called again for the
177-
# same payload - one time hit
173+
read_data(payload, schema_dict)
178174

175+
# If we reach this point, this means we have fastavro and it can
176+
# do this deserialization. Rewind since this method just determines
177+
# the reader function and we need to deserialize again along the
178+
# normal path.
179179
payload.seek(curr_pos)
180-
decoder_func = lambda p: read_data(p, schema_dict)
181-
self.id_to_decoder_func[schema_id] = decoder_func
180+
181+
self.id_to_decoder_func[schema_id] = lambda p: read_data(p, schema_dict)
182182
return self.id_to_decoder_func[schema_id]
183183
except:
184184
pass

confluent_kafka/kafkatest/verifiable_client.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,22 @@
1313
# limitations under the License.
1414

1515

16-
import signal, socket, os, sys, time, json, re, datetime
16+
import datetime
17+
import json
18+
import os
19+
import re
20+
import signal
21+
import socket
22+
import sys
23+
import time
1724

1825

1926
class VerifiableClient(object):
2027
"""
2128
Generic base class for a kafkatest verifiable client.
2229
Implements the common kafkatest protocol and semantics.
2330
"""
24-
def __init__ (self, conf):
31+
def __init__(self, conf):
2532
"""
2633
"""
2734
super(VerifiableClient, self).__init__()
@@ -31,37 +38,36 @@ def __init__ (self, conf):
3138
signal.signal(signal.SIGTERM, self.sig_term)
3239
self.dbg('Pid is %d' % os.getpid())
3340

34-
def sig_term (self, sig, frame):
41+
def sig_term(self, sig, frame):
3542
self.dbg('SIGTERM')
3643
self.run = False
3744

3845
@staticmethod
39-
def _timestamp ():
46+
def _timestamp():
4047
return time.strftime('%H:%M:%S', time.localtime())
4148

42-
def dbg (self, s):
49+
def dbg(self, s):
4350
""" Debugging printout """
4451
sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
4552

46-
def err (self, s, term=False):
53+
def err(self, s, term=False):
4754
""" Error printout, if term=True the process will terminate immediately. """
4855
sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
4956
if term:
5057
sys.stderr.write('%% FATAL ERROR ^\n')
5158
sys.exit(1)
5259

53-
def send (self, d):
60+
def send(self, d):
5461
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
5562
d['_time'] = str(datetime.datetime.now())
5663
self.dbg('SEND: %s' % json.dumps(d))
5764
sys.stdout.write('%s\n' % json.dumps(d))
5865
sys.stdout.flush()
5966

60-
6167
@staticmethod
62-
def set_config (conf, args):
68+
def set_config(conf, args):
6369
""" Set client config properties using args dict. """
64-
for n,v in args.iteritems():
70+
for n, v in args.iteritems():
6571
if v is None:
6672
continue
6773
# Things to ignore

0 commit comments

Comments
 (0)