Skip to content

Commit

Permalink
remove dead code (#584)
Browse files Browse the repository at this point in the history
* remove dead code
  • Loading branch information
kickster97 authored Nov 1, 2023
1 parent 1a78484 commit 4c7bf8a
Show file tree
Hide file tree
Showing 18 changed files with 0 additions and 208 deletions.
12 changes: 0 additions & 12 deletions src/lavinmq/client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -435,22 +435,10 @@ module LavinMQ
notify_has_capacity(count)
end

def unacked_for_queue(queue) : Iterator(SegmentPosition)
@unacked.each.select(&.queue.==(queue)).map(&.sp)
end

def unacked_count
@unacked.size
end

def each_unacked(& : Unack -> Nil)
@unack_lock.synchronize do
@unacked.each do |unack|
yield unack
end
end
end

record TxAck, delivery_tag : UInt64, multiple : Bool, negative : Bool, requeue : Bool
@tx_acks = Array(TxAck).new

Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/client/channel/consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,6 @@ module LavinMQ
end
end

def name
@tag
end

# blocks until the consumer can accept more messages
private def wait_for_capacity : Nil
if @prefetch_count > 0
Expand Down
16 changes: 0 additions & 16 deletions src/lavinmq/client/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ module LavinMQ
spawn read_loop, name: "Client#read_loop #{@remote_address}"
end

# socket's file descriptor
def fd
case @socket
when OpenSSL::SSL::Socket
@socket.as(OpenSSL::SSL::Socket).@bio.io.as(IO::FileDescriptor).fd
when TCPSocket
@socket.as(TCPSocket).fd
when UNIXSocket
@socket.as(UNIXSocket).fd
when WebSocketIO
@socket.as(WebSocketIO).fd
else
raise "Unexpected socket #{@socket.class}"
end
end

# Returns client provided connection name if set, else server generated name
def client_name
@client_properties["connection_name"]?.try(&.as(String)) || @name
Expand Down
7 changes: 0 additions & 7 deletions src/lavinmq/http/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,6 @@ module LavinMQ
end
end

private def refuse_unless_monitoring(context, user)
unless user.tags.any? { |t| t.administrator? || t.monitoring? }
Log.warn { "user=#{user.name} does not have monitoring access" }
access_refused(context)
end
end

private def refuse_unless_administrator(context, user : User)
unless user.tags.any? &.administrator?
Log.warn { "user=#{user.name} does not have administrator access" }
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/http/handler/websocket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,5 @@ module LavinMQ
def read_timeout=(timeout)
@r.read_timeout = timeout
end

def fd
io = @ws.@ws.@io
return io.fd if io.responds_to?(:fd)
0
end
end
end
9 changes: 0 additions & 9 deletions src/lavinmq/http/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ module LavinMQ
@http = ::HTTP::Server.new(handlers)
end

def bind(socket)
addr = @http.bind(socket)
Log.info { "Bound to #{addr}" }
end

def bind_tcp(address, port)
addr = @http.bind_tcp address, port
Log.info { "Bound to #{addr}" }
Expand Down Expand Up @@ -83,10 +78,6 @@ module LavinMQ
File.delete?(INTERNAL_UNIX_SOCKET)
end

def closed?
@http.closed?
end

class NotFoundError < Exception; end

class ExpectedBodyError < ArgumentError; end
Expand Down
19 changes: 0 additions & 19 deletions src/lavinmq/message.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ module LavinMQ
body = bytes[pos, sz]
BytesMessage.new(ts, ex, rk, pr, sz, body)
end

def to_io(io : IO, format = IO::ByteFormat::SystemEndian)
io.write_bytes @timestamp, format
io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format
io.write_bytes AMQ::Protocol::ShortString.new(@routing_key), format
io.write_bytes @properties, format
io.write_bytes @bodysize, format
io.write @body
end
end

# Messages from publishers, read from socket and then written to mmap files
Expand All @@ -87,16 +78,6 @@ module LavinMQ
@properties.bytesize + sizeof(UInt64) + @bodysize
end

def dlx : String?
@properties.headers.try(&.fetch("x-dead-letter-exchange", nil).as?(String))
end

def delay : UInt32?
@properties.headers.try(&.fetch("x-delay", nil)).as?(Int).try(&.to_u32)
rescue OverflowError
nil
end

def to_io(io : IO, format = IO::ByteFormat::SystemEndian)
io.write_bytes @timestamp, format
io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format
Expand Down
59 changes: 0 additions & 59 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ class MFile < IO
end
end

# Opens an existing file in readonly mode
def self.open(path) : self
self.new(path)
end

# Opens an existing file in readonly mode
def self.open(path, & : self -> _)
mfile = self.new(path)
Expand Down Expand Up @@ -75,12 +70,6 @@ class MFile < IO
stat.st_size.to_i64
end

def disk_usage : Int64
code = LibC.fstat(@fd, out stat)
raise File::Error.from_errno("Unable to get info", file: @path) if code < 0
stat.st_blocks.to_i64 * 512
end

private def mmap(length = @capacity) : Pointer(UInt8)
return Pointer(UInt8).null if length.zero?
protection = case
Expand Down Expand Up @@ -175,17 +164,6 @@ class MFile < IO
raise RuntimeError.from_errno("msync") if code < 0
end

private def mremap(addr, old_len, new_len) : Pointer(UInt8)
{% if flag?(:linux) %}
ptr = LibC.mremap(addr, old_len, new_len, LibC::MREMAP_MAYMOVE)
raise IO::Error.from_errno("mremap") if ptr == LibC::MAP_FAILED
ptr.as(UInt8*)
{% else %}
munmap(addr, old_len)
mmap(addr, new_len)
{% end %}
end

def finalize
LibC.close(@fd) if @fd > -1
LibC.munmap(@buffer, @capacity) unless @buffer.null?
Expand Down Expand Up @@ -271,46 +249,9 @@ class MFile < IO
DontNeed
end

def truncate(capacity : Int) : Nil
return if capacity == @capacity
capacity = capacity.to_i64
code = LibC.ftruncate(@fd, capacity)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
old_capacity = @capacity
@capacity = capacity
@size = capacity if @size > capacity
@pos = capacity if @pos > capacity
@buffer = mremap(@buffer, old_capacity, capacity) unless @buffer.null?
end

def resize(new_size : Int) : Nil
raise ArgumentError.new("Can't expand file larger than capacity, use truncate") if new_size > @capacity
@size = new_size.to_i64
@pos = new_size.to_i64 if @pos > new_size
end

def rename(new_path)
File.rename @path, new_path
@path = new_path
end

def read_at(offset : Int, slice : Bytes)
total = slice.size
until slice.empty?
cnt = LibC.pread(@fd, slice, slice.size, offset)
raise File::Error.from_errno("pread", file: @path) if cnt < 0
break if cnt.zero?
slice += cnt
offset += cnt
end
total - slice.size
end

PAGESIZE = LibC.sysconf(LibC::SC_PAGESIZE).to_u32

private def page_align(n : Int) : Int
n += PAGESIZE - 1
n -= n & PAGESIZE - 1
n
end
end
9 changes: 0 additions & 9 deletions src/lavinmq/parameter_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ module LavinMQ

forward_missing_to @parameters

def create(data : JSON::Any, save = true)
p = T.from_json(data)
create(p, save)
end

def create(parameter : T, save = true)
@parameters[parameter.name] = parameter
save! if save
Expand Down Expand Up @@ -50,10 +45,6 @@ module LavinMQ
end
end

def close
save!
end

def to_json(json : JSON::Builder)
json.array do
each_value do |p|
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/proxy_protocol.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ module LavinMQ
ensure
io.read_timeout = nil
end

def self.encode(header, io)
ipv = header.src.family.inet6? ? 6 : 4
io.print "PROXY TCP#{ipv} #{header.src.address} #{header.dst.address} #{header.src.port} #{header.dst.port}\r\n"
io.flush
end
end

module V2
Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,6 @@ module LavinMQ
@msg_store.empty?
end

def any? : Bool
!empty?
end

def consumer_count
@consumers.size.to_u32
end
Expand Down
5 changes: 0 additions & 5 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,6 @@ module LavinMQ
start_size - @size
end

private def update_stat_per_msg(seg, ts, bytesize)
super
@segment_last_ts[seg] = last_ts
end

def delete(sp) : Nil
raise "Only full segments should be deleted"
end
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/schema.cr
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,6 @@ module LavinMQ
file.flush
version
end

def self.verify_or_prefix(file, type) : Int32
verify(file, type)
rescue IO::EOFError
prefix(file, type)
end
end

class OutdatedSchemaVersion < Exception
Expand Down
4 changes: 0 additions & 4 deletions src/lavinmq/shovel/shovel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,6 @@ module LavinMQ
Log.info &.emit("Terminated", name: @name, vhost: @vhost.name)
end

def delete
terminate
end

def terminated?
@state.terminated?
end
Expand Down
6 changes: 0 additions & 6 deletions src/lavinmq/shovel/shovel_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ module LavinMQ
end
end

def each(&)
@shovels.each_value do |v|
yield v
end
end

private def destination(name, config, ack_mode, delete_after, prefetch)
uri = URI.parse(config["dest-uri"].as_s)
case uri.scheme
Expand Down
15 changes: 0 additions & 15 deletions src/lavinmq/user.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ module LavinMQ
PolicyMaker
Impersonator

def to_json(json : JSON::Builder)
to_s.downcase.to_json(json)
end

def self.parse_list(list : String) : Array(Tag)
list.split(",").compact_map { |t| Tag.parse?(t.strip) }
end
Expand Down Expand Up @@ -212,17 +208,6 @@ module LavinMQ
perm != /^$/ && perm != // && perm.matches? name
end

private def hash_algorithm(hash)
case hash
when /^\$2a\$/ then "Bcrypt"
when /^\$1\$/ then "MD5"
when /^\$5\$/ then "SHA256"
when /^\$6\$/ then "SHA512"
when /^$/ then ""
else raise UnknownHashAlgoritm.new
end
end

class UnknownHashAlgoritm < Exception; end
end
end
16 changes: 0 additions & 16 deletions src/stdlib/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,6 @@ class Channel(T)
@lock.unlock
end

def try_receive : T?
@lock.lock
state, value = receive_internal
case state
in .delivered?
raise "BUG: Unexpected UseDefault value for delivered receive" if value.is_a?(UseDefault)
value
in .closed?
raise ClosedError.new
in .none?
nil
end
ensure
@lock.unlock
end

def try_receive? : T?
@lock.lock
state, value = receive_internal
Expand Down
5 changes: 0 additions & 5 deletions src/stdlib/io_memory.cr

This file was deleted.

0 comments on commit 4c7bf8a

Please sign in to comment.