Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove dead code #584

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions src/lavinmq/client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -435,22 +435,10 @@ module LavinMQ
notify_has_capacity(count)
end

def unacked_for_queue(queue) : Iterator(SegmentPosition)
@unacked.each.select(&.queue.==(queue)).map(&.sp)
end

def unacked_count
@unacked.size
end

def each_unacked(& : Unack -> Nil)
@unack_lock.synchronize do
@unacked.each do |unack|
yield unack
end
end
end

record TxAck, delivery_tag : UInt64, multiple : Bool, negative : Bool, requeue : Bool
@tx_acks = Array(TxAck).new

Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/client/channel/consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ module LavinMQ
end
end

def name
@tag
end

# blocks until the consumer can accept more messages
private def wait_for_capacity : Nil
if @prefetch_count > 0
Expand Down
16 changes: 0 additions & 16 deletions src/lavinmq/client/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ module LavinMQ
spawn read_loop, name: "Client#read_loop #{@remote_address}"
end

# socket's file descriptor
def fd
case @socket
when OpenSSL::SSL::Socket
@socket.as(OpenSSL::SSL::Socket).@bio.io.as(IO::FileDescriptor).fd
when TCPSocket
@socket.as(TCPSocket).fd
when UNIXSocket
@socket.as(UNIXSocket).fd
when WebSocketIO
@socket.as(WebSocketIO).fd
else
raise "Unexpected socket #{@socket.class}"
end
end

# Returns client provided connection name if set, else server generated name
def client_name
@client_properties["connection_name"]?.try(&.as(String)) || @name
Expand Down
7 changes: 0 additions & 7 deletions src/lavinmq/http/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,6 @@ module LavinMQ
end
end

private def refuse_unless_monitoring(context, user)
unless user.tags.any? { |t| t.administrator? || t.monitoring? }
Log.warn { "user=#{user.name} does not have monitoring access" }
access_refused(context)
end
end

private def refuse_unless_administrator(context, user : User)
unless user.tags.any? &.administrator?
Log.warn { "user=#{user.name} does not have administrator access" }
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/http/handler/websocket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,5 @@ module LavinMQ
def read_timeout=(timeout)
@r.read_timeout = timeout
end

def fd
io = @ws.@ws.@io
return io.fd if io.responds_to?(:fd)
0
end
end
end
9 changes: 0 additions & 9 deletions src/lavinmq/http/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ module LavinMQ
@http = ::HTTP::Server.new(handlers)
end

def bind(socket)
addr = @http.bind(socket)
Log.info { "Bound to #{addr}" }
end

def bind_tcp(address, port)
addr = @http.bind_tcp address, port
Log.info { "Bound to #{addr}" }
Expand Down Expand Up @@ -83,10 +78,6 @@ module LavinMQ
File.delete?(INTERNAL_UNIX_SOCKET)
end

def closed?
@http.closed?
end

class NotFoundError < Exception; end

class ExpectedBodyError < ArgumentError; end
Expand Down
19 changes: 0 additions & 19 deletions src/lavinmq/message.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ module LavinMQ
body = bytes[pos, sz]
BytesMessage.new(ts, ex, rk, pr, sz, body)
end

def to_io(io : IO, format = IO::ByteFormat::SystemEndian)
io.write_bytes @timestamp, format
io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format
io.write_bytes AMQ::Protocol::ShortString.new(@routing_key), format
io.write_bytes @properties, format
io.write_bytes @bodysize, format
io.write @body
end
end

# Messages from publishers, read from socket and then written to mmap files
Expand All @@ -87,16 +78,6 @@ module LavinMQ
@properties.bytesize + sizeof(UInt64) + @bodysize
end

def dlx : String?
@properties.headers.try(&.fetch("x-dead-letter-exchange", nil).as?(String))
end

def delay : UInt32?
@properties.headers.try(&.fetch("x-delay", nil)).as?(Int).try(&.to_u32)
rescue OverflowError
nil
end

def to_io(io : IO, format = IO::ByteFormat::SystemEndian)
io.write_bytes @timestamp, format
io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format
Expand Down
59 changes: 0 additions & 59 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ class MFile < IO
end
end

# Opens an existing file in readonly mode
def self.open(path) : self
self.new(path)
end

# Opens an existing file in readonly mode
def self.open(path, & : self -> _)
mfile = self.new(path)
Expand Down Expand Up @@ -75,12 +70,6 @@ class MFile < IO
stat.st_size.to_i64
end

def disk_usage : Int64
code = LibC.fstat(@fd, out stat)
raise File::Error.from_errno("Unable to get info", file: @path) if code < 0
stat.st_blocks.to_i64 * 512
end

private def mmap(length = @capacity) : Pointer(UInt8)
return Pointer(UInt8).null if length.zero?
protection = case
Expand Down Expand Up @@ -175,17 +164,6 @@ class MFile < IO
raise RuntimeError.from_errno("msync") if code < 0
end

private def mremap(addr, old_len, new_len) : Pointer(UInt8)
{% if flag?(:linux) %}
ptr = LibC.mremap(addr, old_len, new_len, LibC::MREMAP_MAYMOVE)
raise IO::Error.from_errno("mremap") if ptr == LibC::MAP_FAILED
ptr.as(UInt8*)
{% else %}
munmap(addr, old_len)
mmap(addr, new_len)
{% end %}
end

def finalize
LibC.close(@fd) if @fd > -1
LibC.munmap(@buffer, @capacity) unless @buffer.null?
Expand Down Expand Up @@ -271,46 +249,9 @@ class MFile < IO
DontNeed
end

def truncate(capacity : Int) : Nil
return if capacity == @capacity
capacity = capacity.to_i64
code = LibC.ftruncate(@fd, capacity)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
old_capacity = @capacity
@capacity = capacity
@size = capacity if @size > capacity
@pos = capacity if @pos > capacity
@buffer = mremap(@buffer, old_capacity, capacity) unless @buffer.null?
end

def resize(new_size : Int) : Nil
raise ArgumentError.new("Can't expand file larger than capacity, use truncate") if new_size > @capacity
@size = new_size.to_i64
@pos = new_size.to_i64 if @pos > new_size
end

def rename(new_path)
File.rename @path, new_path
@path = new_path
end

def read_at(offset : Int, slice : Bytes)
total = slice.size
until slice.empty?
cnt = LibC.pread(@fd, slice, slice.size, offset)
raise File::Error.from_errno("pread", file: @path) if cnt < 0
break if cnt.zero?
slice += cnt
offset += cnt
end
total - slice.size
end

PAGESIZE = LibC.sysconf(LibC::SC_PAGESIZE).to_u32

private def page_align(n : Int) : Int
n += PAGESIZE - 1
n -= n & PAGESIZE - 1
n
end
end
9 changes: 0 additions & 9 deletions src/lavinmq/parameter_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ module LavinMQ

forward_missing_to @parameters

def create(data : JSON::Any, save = true)
p = T.from_json(data)
create(p, save)
end

def create(parameter : T, save = true)
@parameters[parameter.name] = parameter
save! if save
Expand Down Expand Up @@ -50,10 +45,6 @@ module LavinMQ
end
end

def close
save!
end

def to_json(json : JSON::Builder)
json.array do
each_value do |p|
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/proxy_protocol.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ module LavinMQ
ensure
io.read_timeout = nil
end

def self.encode(header, io)
ipv = header.src.family.inet6? ? 6 : 4
io.print "PROXY TCP#{ipv} #{header.src.address} #{header.dst.address} #{header.src.port} #{header.dst.port}\r\n"
io.flush
end
end

module V2
Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,6 @@ module LavinMQ
@msg_store.empty?
end

def any? : Bool
!empty?
end

def consumer_count
@consumers.size.to_u32
end
Expand Down
5 changes: 0 additions & 5 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,6 @@ module LavinMQ
start_size - @size
end

private def update_stat_per_msg(seg, ts, bytesize)
super
@segment_last_ts[seg] = last_ts
end

def delete(sp) : Nil
raise "Only full segments should be deleted"
end
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/schema.cr
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,6 @@ module LavinMQ
file.flush
version
end

def self.verify_or_prefix(file, type) : Int32
verify(file, type)
rescue IO::EOFError
prefix(file, type)
end
end

class OutdatedSchemaVersion < Exception
Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/shovel/shovel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,6 @@ module LavinMQ
Log.info &.emit("Terminated", name: @name, vhost: @vhost.name)
end

def delete
terminate
end

def terminated?
@state.terminated?
end
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/shovel/shovel_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ module LavinMQ
end
end

def each(&)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because forward_missing_to further up? Maybe remove that instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we currently use each_value for shovels, instead of its own each
why remove forward_missing_to and exchange each_value for each instead of just removing each?

@shovels.each_value do |v|
yield v
end
end

private def destination(name, config, ack_mode, delete_after, prefetch)
uri = URI.parse(config["dest-uri"].as_s)
case uri.scheme
Expand Down
15 changes: 0 additions & 15 deletions src/lavinmq/user.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ module LavinMQ
PolicyMaker
Impersonator

def to_json(json : JSON::Builder)
to_s.downcase.to_json(json)
end

def self.parse_list(list : String) : Array(Tag)
list.split(",").compact_map { |t| Tag.parse?(t.strip) }
end
Expand Down Expand Up @@ -212,17 +208,6 @@ module LavinMQ
perm != /^$/ && perm != // && perm.matches? name
end

private def hash_algorithm(hash)
case hash
when /^\$2a\$/ then "Bcrypt"
when /^\$1\$/ then "MD5"
when /^\$5\$/ then "SHA256"
when /^\$6\$/ then "SHA512"
when /^$/ then ""
else raise UnknownHashAlgoritm.new
end
end

class UnknownHashAlgoritm < Exception; end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then drop this too i guess

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still used higher up in file

end
end
16 changes: 0 additions & 16 deletions src/stdlib/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ class Channel(T)
@lock.unlock
end

def try_receive : T?
@lock.lock
state, value = receive_internal
case state
in .delivered?
raise "BUG: Unexpected UseDefault value for delivered receive" if value.is_a?(UseDefault)
value
in .closed?
raise ClosedError.new
in .none?
nil
end
ensure
@lock.unlock
end

def try_receive? : T?
@lock.lock
state, value = receive_internal
Expand Down
5 changes: 0 additions & 5 deletions src/stdlib/io_memory.cr

This file was deleted.

Loading