diff --git a/src/lavinmq/client/channel.cr b/src/lavinmq/client/channel.cr index 4280683aec..13339a70ea 100644 --- a/src/lavinmq/client/channel.cr +++ b/src/lavinmq/client/channel.cr @@ -435,22 +435,10 @@ module LavinMQ notify_has_capacity(count) end - def unacked_for_queue(queue) : Iterator(SegmentPosition) - @unacked.each.select(&.queue.==(queue)).map(&.sp) - end - def unacked_count @unacked.size end - def each_unacked(& : Unack -> Nil) - @unack_lock.synchronize do - @unacked.each do |unack| - yield unack - end - end - end - record TxAck, delivery_tag : UInt64, multiple : Bool, negative : Bool, requeue : Bool @tx_acks = Array(TxAck).new diff --git a/src/lavinmq/client/channel/consumer.cr b/src/lavinmq/client/channel/consumer.cr index e7d1c5dd60..b47ba9f3b2 100644 --- a/src/lavinmq/client/channel/consumer.cr +++ b/src/lavinmq/client/channel/consumer.cr @@ -160,10 +160,6 @@ module LavinMQ end end - def name - @tag - end - # blocks until the consumer can accept more messages private def wait_for_capacity : Nil if @prefetch_count > 0 diff --git a/src/lavinmq/client/client.cr b/src/lavinmq/client/client.cr index 76232850fd..5461caeb83 100644 --- a/src/lavinmq/client/client.cr +++ b/src/lavinmq/client/client.cr @@ -62,22 +62,6 @@ module LavinMQ spawn read_loop, name: "Client#read_loop #{@remote_address}" end - # socket's file descriptor - def fd - case @socket - when OpenSSL::SSL::Socket - @socket.as(OpenSSL::SSL::Socket).@bio.io.as(IO::FileDescriptor).fd - when TCPSocket - @socket.as(TCPSocket).fd - when UNIXSocket - @socket.as(UNIXSocket).fd - when WebSocketIO - @socket.as(WebSocketIO).fd - else - raise "Unexpected socket #{@socket.class}" - end - end - # Returns client provided connection name if set, else server generated name def client_name @client_properties["connection_name"]?.try(&.as(String)) || @name diff --git a/src/lavinmq/http/controller.cr b/src/lavinmq/http/controller.cr index 97440b1185..b793a42093 100644 --- a/src/lavinmq/http/controller.cr +++ b/src/lavinmq/http/controller.cr @@ -227,13 +227,6 @@ module LavinMQ end end - private def refuse_unless_monitoring(context, user) - unless user.tags.any? { |t| t.administrator? || t.monitoring? } - Log.warn { "user=#{user.name} does not have monitoring access" } - access_refused(context) - end - end - private def refuse_unless_administrator(context, user : User) unless user.tags.any? &.administrator? Log.warn { "user=#{user.name} does not have administrator access" } diff --git a/src/lavinmq/http/handler/websocket.cr b/src/lavinmq/http/handler/websocket.cr index e09fc5aad0..9fed47a605 100644 --- a/src/lavinmq/http/handler/websocket.cr +++ b/src/lavinmq/http/handler/websocket.cr @@ -55,11 +55,5 @@ module LavinMQ def read_timeout=(timeout) @r.read_timeout = timeout end - - def fd - io = @ws.@ws.@io - return io.fd if io.responds_to?(:fd) - 0 - end end end diff --git a/src/lavinmq/http/http_server.cr b/src/lavinmq/http/http_server.cr index 8f55da539f..371ce735df 100644 --- a/src/lavinmq/http/http_server.cr +++ b/src/lavinmq/http/http_server.cr @@ -45,11 +45,6 @@ module LavinMQ @http = ::HTTP::Server.new(handlers) end - def bind(socket) - addr = @http.bind(socket) - Log.info { "Bound to #{addr}" } - end - def bind_tcp(address, port) addr = @http.bind_tcp address, port Log.info { "Bound to #{addr}" } @@ -83,10 +78,6 @@ module LavinMQ File.delete?(INTERNAL_UNIX_SOCKET) end - def closed? - @http.closed? - end - class NotFoundError < Exception; end class ExpectedBodyError < ArgumentError; end diff --git a/src/lavinmq/message.cr b/src/lavinmq/message.cr index c0bf2f60a3..d66ecc30c9 100644 --- a/src/lavinmq/message.cr +++ b/src/lavinmq/message.cr @@ -54,15 +54,6 @@ module LavinMQ body = bytes[pos, sz] BytesMessage.new(ts, ex, rk, pr, sz, body) end - - def to_io(io : IO, format = IO::ByteFormat::SystemEndian) - io.write_bytes @timestamp, format - io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format - io.write_bytes AMQ::Protocol::ShortString.new(@routing_key), format - io.write_bytes @properties, format - io.write_bytes @bodysize, format - io.write @body - end end # Messages from publishers, read from socket and then written to mmap files @@ -87,16 +78,6 @@ module LavinMQ @properties.bytesize + sizeof(UInt64) + @bodysize end - def dlx : String? - @properties.headers.try(&.fetch("x-dead-letter-exchange", nil).as?(String)) - end - - def delay : UInt32? - @properties.headers.try(&.fetch("x-delay", nil)).as?(Int).try(&.to_u32) - rescue OverflowError - nil - end - def to_io(io : IO, format = IO::ByteFormat::SystemEndian) io.write_bytes @timestamp, format io.write_bytes AMQ::Protocol::ShortString.new(@exchange_name), format diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 9356c57712..e864c43064 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -42,11 +42,6 @@ class MFile < IO end end - # Opens an existing file in readonly mode - def self.open(path) : self - self.new(path) - end - # Opens an existing file in readonly mode def self.open(path, & : self -> _) mfile = self.new(path) @@ -75,12 +70,6 @@ class MFile < IO stat.st_size.to_i64 end - def disk_usage : Int64 - code = LibC.fstat(@fd, out stat) - raise File::Error.from_errno("Unable to get info", file: @path) if code < 0 - stat.st_blocks.to_i64 * 512 - end - private def mmap(length = @capacity) : Pointer(UInt8) return Pointer(UInt8).null if length.zero? protection = case @@ -175,17 +164,6 @@ class MFile < IO raise RuntimeError.from_errno("msync") if code < 0 end - private def mremap(addr, old_len, new_len) : Pointer(UInt8) - {% if flag?(:linux) %} - ptr = LibC.mremap(addr, old_len, new_len, LibC::MREMAP_MAYMOVE) - raise IO::Error.from_errno("mremap") if ptr == LibC::MAP_FAILED - ptr.as(UInt8*) - {% else %} - munmap(addr, old_len) - mmap(addr, new_len) - {% end %} - end - def finalize LibC.close(@fd) if @fd > -1 LibC.munmap(@buffer, @capacity) unless @buffer.null? @@ -271,46 +249,9 @@ class MFile < IO DontNeed end - def truncate(capacity : Int) : Nil - return if capacity == @capacity - capacity = capacity.to_i64 - code = LibC.ftruncate(@fd, capacity) - raise File::Error.from_errno("Error truncating file", file: @path) if code < 0 - old_capacity = @capacity - @capacity = capacity - @size = capacity if @size > capacity - @pos = capacity if @pos > capacity - @buffer = mremap(@buffer, old_capacity, capacity) unless @buffer.null? - end - def resize(new_size : Int) : Nil raise ArgumentError.new("Can't expand file larger than capacity, use truncate") if new_size > @capacity @size = new_size.to_i64 @pos = new_size.to_i64 if @pos > new_size end - - def rename(new_path) - File.rename @path, new_path - @path = new_path - end - - def read_at(offset : Int, slice : Bytes) - total = slice.size - until slice.empty? - cnt = LibC.pread(@fd, slice, slice.size, offset) - raise File::Error.from_errno("pread", file: @path) if cnt < 0 - break if cnt.zero? - slice += cnt - offset += cnt - end - total - slice.size - end - - PAGESIZE = LibC.sysconf(LibC::SC_PAGESIZE).to_u32 - - private def page_align(n : Int) : Int - n += PAGESIZE - 1 - n -= n & PAGESIZE - 1 - n - end end diff --git a/src/lavinmq/parameter_store.cr b/src/lavinmq/parameter_store.cr index 8759ea5f1b..f9a5926d48 100644 --- a/src/lavinmq/parameter_store.cr +++ b/src/lavinmq/parameter_store.cr @@ -12,11 +12,6 @@ module LavinMQ forward_missing_to @parameters - def create(data : JSON::Any, save = true) - p = T.from_json(data) - create(p, save) - end - def create(parameter : T, save = true) @parameters[parameter.name] = parameter save! if save @@ -50,10 +45,6 @@ module LavinMQ end end - def close - save! - end - def to_json(json : JSON::Builder) json.array do each_value do |p| diff --git a/src/lavinmq/proxy_protocol.cr b/src/lavinmq/proxy_protocol.cr index c1f4834752..21f5f49d43 100644 --- a/src/lavinmq/proxy_protocol.cr +++ b/src/lavinmq/proxy_protocol.cr @@ -47,12 +47,6 @@ module LavinMQ ensure io.read_timeout = nil end - - def self.encode(header, io) - ipv = header.src.family.inet6? ? 6 : 4 - io.print "PROXY TCP#{ipv} #{header.src.address} #{header.dst.address} #{header.src.port} #{header.dst.port}\r\n" - io.flush - end end module V2 diff --git a/src/lavinmq/queue/queue.cr b/src/lavinmq/queue/queue.cr index 47c276dce7..2c7f935aa9 100644 --- a/src/lavinmq/queue/queue.cr +++ b/src/lavinmq/queue/queue.cr @@ -294,10 +294,6 @@ module LavinMQ @msg_store.empty? end - def any? : Bool - !empty? - end - def consumer_count @consumers.size.to_u32 end diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 0226e38ced..6ee7b4f5a5 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -195,11 +195,6 @@ module LavinMQ start_size - @size end - private def update_stat_per_msg(seg, ts, bytesize) - super - @segment_last_ts[seg] = last_ts - end - def delete(sp) : Nil raise "Only full segments should be deleted" end diff --git a/src/lavinmq/schema.cr b/src/lavinmq/schema.cr index 8a862d1cf0..f12faf75e4 100644 --- a/src/lavinmq/schema.cr +++ b/src/lavinmq/schema.cr @@ -292,12 +292,6 @@ module LavinMQ file.flush version end - - def self.verify_or_prefix(file, type) : Int32 - verify(file, type) - rescue IO::EOFError - prefix(file, type) - end end class OutdatedSchemaVersion < Exception diff --git a/src/lavinmq/shovel/shovel.cr b/src/lavinmq/shovel/shovel.cr index d9fb4c6651..13afdccd0e 100644 --- a/src/lavinmq/shovel/shovel.cr +++ b/src/lavinmq/shovel/shovel.cr @@ -372,10 +372,6 @@ module LavinMQ Log.info &.emit("Terminated", name: @name, vhost: @vhost.name) end - def delete - terminate - end - def terminated? @state.terminated? end diff --git a/src/lavinmq/shovel/shovel_store.cr b/src/lavinmq/shovel/shovel_store.cr index 3b1b908230..b8693287d6 100644 --- a/src/lavinmq/shovel/shovel_store.cr +++ b/src/lavinmq/shovel/shovel_store.cr @@ -41,12 +41,6 @@ module LavinMQ end end - def each(&) - @shovels.each_value do |v| - yield v - end - end - private def destination(name, config, ack_mode, delete_after, prefetch) uri = URI.parse(config["dest-uri"].as_s) case uri.scheme diff --git a/src/lavinmq/user.cr b/src/lavinmq/user.cr index d86a878831..299b2459fb 100644 --- a/src/lavinmq/user.cr +++ b/src/lavinmq/user.cr @@ -10,10 +10,6 @@ module LavinMQ PolicyMaker Impersonator - def to_json(json : JSON::Builder) - to_s.downcase.to_json(json) - end - def self.parse_list(list : String) : Array(Tag) list.split(",").compact_map { |t| Tag.parse?(t.strip) } end @@ -212,17 +208,6 @@ module LavinMQ perm != /^$/ && perm != // && perm.matches? name end - private def hash_algorithm(hash) - case hash - when /^\$2a\$/ then "Bcrypt" - when /^\$1\$/ then "MD5" - when /^\$5\$/ then "SHA256" - when /^\$6\$/ then "SHA512" - when /^$/ then "" - else raise UnknownHashAlgoritm.new - end - end - class UnknownHashAlgoritm < Exception; end end end diff --git a/src/stdlib/channel.cr b/src/stdlib/channel.cr index fbe4ac76f9..28c575462f 100644 --- a/src/stdlib/channel.cr +++ b/src/stdlib/channel.cr @@ -34,22 +34,6 @@ class Channel(T) @lock.unlock end - def try_receive : T? - @lock.lock - state, value = receive_internal - case state - in .delivered? - raise "BUG: Unexpected UseDefault value for delivered receive" if value.is_a?(UseDefault) - value - in .closed? - raise ClosedError.new - in .none? - nil - end - ensure - @lock.unlock - end - def try_receive? : T? @lock.lock state, value = receive_internal diff --git a/src/stdlib/io_memory.cr b/src/stdlib/io_memory.cr deleted file mode 100644 index 0940531458..0000000000 --- a/src/stdlib/io_memory.cr +++ /dev/null @@ -1,5 +0,0 @@ -class IO::Memory - def capacity - @capacity - end -end