Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process dependencies? #43

Closed
tetherit opened this issue Mar 9, 2014 · 10 comments
Closed

Process dependencies? #43

tetherit opened this issue Mar 9, 2014 · 10 comments
Labels

Comments

@tetherit
Copy link

tetherit commented Mar 9, 2014

Is there anyway to tell 1 process in the group to only start when another process in the group is already started?

@kostya
Copy link
Owner

kostya commented Mar 9, 2014

Hi, there is no such possibility, because it makes things hard.
Processes in group have only one "dependency" - is chain.

@kostya kostya added to_think and removed to_think labels Mar 9, 2014
@tetherit
Copy link
Author

What about instead of dependencies, to have process pre-conditions? -- For example, I have a process that records from a camera, but I want it to only start if eye can ping the camera. Maybe something like:

pre_check :ping, every: 30.seconds, addr: "192.168.88.12"

This check can run before the process goes into "starting" state and this way there is an option to be able to add custom pre_checks like if a PID of another process is running.

What do you think?

@kostya
Copy link
Owner

kostya commented Mar 10, 2014

I will think about it.
Now you can use trigger:
:a is starting with grace 30.seconds, :b wait until :a become :up (60 is timeout here)

Eye.app :delay do

  process :a do
    pid_file "/tmp/a.pid"
    start_command "sleep 10000"
    start_grace 30.seconds
    daemonize true
  end

  process :b do
    pid_file "/tmp/b.pid"
    start_command "sleep 10000"
    daemonize true

    trigger :transition, :to => :starting, :do => -> {
      a = Eye::Control.process_by_name('a')
      process.wait_for_condition(60, 1.0) do
        info "wait for up :a"
        a.state_name == :up
      end
    }
  end

end

@tetherit
Copy link
Author

Thank you for the workaround, I will play around! - I'm guessing if I need to trigger on :restarting if the camera goes offline.

It would be great to have a pre_check option that will work exactly like the current check option but will wait for a check to pass before starting the process and kill the process when the check fails and not attempt to start the process until the check passes again if that makes sense.

kostya added a commit that referenced this issue May 12, 2014
@kostya
Copy link
Owner

kostya commented May 12, 2014

dependencies added in 0.6.rc

@kostya kostya closed this as completed May 12, 2014
@tetherit
Copy link
Author

Awesome! Will play around!

@tetherit
Copy link
Author

It seems this broke the following piece of code, I never get the "Waiting for camera #{url} to be up" message in the logs at all since upgrading to latest trunk:

  trigger :transition, to: :starting, do: -> {
    month = 60 * 60 * 24 * 28
    process.wait_for_condition(month, 15) do
      info "Waiting for camera #{url} to be up"
      timeout = 5 * 1_000_000 # 5 seconds in microseconts
      system("ffmpeg -loglevel warning -y -stimeout #{timeout} " \
        "-i #{url} -c:v copy -an -vframes 1 -f rawvideo /dev/null")
    end
  }

Any ideas?

@kostya
Copy link
Owner

kostya commented May 13, 2014

show full config

@tetherit
Copy link
Author

It's long, but here you go :)

require 'active_support/all'
require 'rest-client'
require 'fileutils'

# Prepare PATHS
root = File.expand_path('../', __FILE__)
PATHS = {
  root: root,
  log: File.join(root, 'log', 'recorder'),
  pids: File.join(root, 'tmp', 'pids', 'recorder'),
  tmp: File.join(root, 'tmp', 'recorder') }
PATHS.values.each { |p| FileUtils.mkdir_p p }

require File.join(root, 'eye', 'rtsp_checker')

# Configure Eye
Eye.load('./eye/*.rb')
Eye.config do
  logger File.join(root, 'log/eye.log'), 7, 10.megabytes
end

# Get Config & Cameras
CONFIG = YAML.load(
  File.read(File.join(root, 'config/secrets.yml'))
  )['production'].with_indifferent_access
cameras = fetch_cameras(CONFIG[:app_port], CONFIG[:agent_token])

# Our Application
Eye.application 'recorder' do
  working_dir PATHS[:tmp]
  stdall File.join(PATHS[:log], 'trash.log')

  # Defaults for all processes
  trigger :flapping, times: 5, within: 15.seconds, retry_in: 30.seconds
  check :cpu, every: 30.seconds, below: 70, times: 2
  check :memory, every: 30.seconds, below: 300.megabytes, times: 2

  cameras.each do |camera|
    camera = camera.with_indifferent_access

    # Each camera gets a group of processes
    group camera[:id] do
      chain grace: 5.seconds # chained start-restart, one by one.

      # Handle when we are recording audio only
      if camera[:video_device] == 'none'
        record_proxy = nil

        # Start the sound detector process
        process :sound_detector do
          sound_detector_process(camera)
        end

      # Handle when we are recording video
      else
        # Use different record url if specified
        if camera[:record_url].present? && \
          camera[:record_url] != camera[:detect_url]
          record_port = camera[:record_port]
        else
          record_port = camera[:detect_port]
        end

        # Start a proxy for the detect_url
        detect_proxy = "rtsp://localhost:#{camera[:detect_port]}/proxyStream"
        process :proxy do
          proxy_process(
            camera[:detect_url], camera[:detect_port],
            camera[:id], force_tcp: camera[:force_tcp])
        end

        # Use a second proxy if relevant
        if record_port != camera[:detect_port]
          record_proxy = "rtsp://localhost:#{camera[:record_port]}/proxyStream"
          process :proxy2 do
            proxy_process(
              camera[:record_url], camera[:record_port], camera[:id],
              pname: :proxy2, force_tcp: camera[:force_tcp])
          end
        else
          record_proxy = "rtsp://localhost:#{camera[:detect_port]}/proxyStream"
        end

        # Start the detector process
        process :detector do
          detector_process(detect_proxy, camera, camera[:live_port])
        end
      end

      # Start the recorder process (same for with/without audio)
      process :recorder do
        recorder_process(record_proxy, camera)
      end

      # Start the sugar process (which parses detector and recorder outputs)
      process :sugar do
        sugar_process(camera)
      end
    end

  end
end

helpers.py has:

def truncate_log(path)
  File.open(path, 'w') { |file| file.truncate(0) }
end

def fetch_cameras(port, token, retries: 5, delay: 5)
  while retries > 0
    begin
      return ActiveSupport::JSON.decode RestClient.get(
        "http://127.0.0.1:#{port}/cameras", params: { token: token })
    rescue Errno::ECONNREFUSED
      puts "Connection refused, retrying in #{delay} seconds... " \
        "(#{retries} tries left)"
      sleep delay
      retries -= 1
    end
  end
end

def wait_for_camera(url)
  trigger :transition, to: :starting, do: -> {
    month = 60 * 60 * 24 * 28
    process.wait_for_condition(month, 15) do
      info "Waiting for camera #{url} to be up"
      timeout = 5 * 1_000_000 # 5 seconds in microseconts
      system("ffmpeg -loglevel warning -y -stimeout #{timeout} " \
        "-i #{url} -c:v copy -an -vframes 1 -f rawvideo /dev/null")
    end
  }
end

def default_process(process, id, daemonize: true)
  pid_file File.join(PATHS[:pids], "#{process}-#{id}.pid")
  stdall File.join(PATHS[:log], "#{process}-#{id}.log")
  daemonize true if daemonize
  # stop_signals [:QUIT, 2.seconds, :TERM, 1.seconds, :KILL]
end

def proxy_process(url, port, id, pname: :proxy, force_tcp: false)
  cmd = "live555ProxyServer -p #{port} -V"
  cmd += ' -t' if force_tcp
  rtsp_url = "rtsp://127.0.0.1:#{port}/proxyStream"
  wait_for_camera(url)

  default_process(pname, id)
  start_command "#{cmd} #{url}"
  check :rtsp, every: 30.seconds, times: 2, addr: rtsp_url
end

def detector_process(url, camera, live_port)
  depend_on 'proxy'
  detector = File.join(PATHS[:root], 'vendor/xanDetector/xanDetector.py')
  truncate_log File.join(PATHS[:log], "detector-#{camera[:id]}.log")
  default_process(:detector, camera[:id])
  start_command "python '#{detector}' '#{url}' '#{camera[:id]}' '#{live_port}'"

  check :ctime, every: 1.minute, times: 2,
                file: File.join(PATHS[:log], "detector-#{camera[:id]}.log")
end

def sugar_process(camera)
  depend_on 'proxy'
  sugar = File.join(PATHS[:root], 'vendor/xanDetector/xanSugar.rb')
  detector_log = File.join(PATHS[:log], "detector-#{camera[:id]}.log")

  default_process(:sugar, camera[:id])
  start_command "ruby '#{sugar}' '#{camera[:id]}' '#{detector_log}'"
end

def recorder_process(url, camera)
  depend_on 'proxy'
  cmd = 'ffmpeg -fflags +genpts -use_wallclock_as_timestamps 1 -loglevel warning'

  # Handle audio
  case camera[:audio_device]
  when 'onboard'
    cmd += ' -f alsa -ac 2 -i plug:onboard'
  when 'ice1712'
    cmd += ' -f alsa -ac 12 -i plug:ice1712'
  when 'none'
    cmd += ' -an'
  end

  # Video Inputs
  case camera[:video_device]
  when 'auto'
    cmd += " -rtsp_transport tcp -i #{url}"
  when 'none'
    cmd += ' -vn'
  end

  cmd += ' -dn' # Data streams are not supported

  # Add correct mapping for audio/video (prioritising onboard/card input)
  if %w[onboard ice1712].include?(camera[:audio_device]) &&
    camera[:video_device] != 'none'
    cmd += ' -map 0:0 -map 1'
  else
    cmd += ' -map 0'
  end

  # Handle audio encoding
  unless camera[:audio_device] == 'none'
    cmd += " -codec:a libfdk_aac -flags +qscale -global_quality 1 " \
      "-afterburner 1 -ar 44100"
  end

  # Add audio filters if we specified a channel
  if %w[onboard ice1712].include?(camera[:audio_device]) &&
      camera[:audio_channel] != 'auto'
    cmd += " -filter:a \"aformat=channel_layouts=0xFFF,pan=1|c0=" \
      "c#{camera[:audio_channel] - 1}\""
  end

  # Handle video encoding
  unless camera[:video_device] == 'none'
    if camera[:transcode] == true
      cmd += ' -codec:v libx264 -preset ultrafast -crf ' +
        camera[:transcode_quality].to_s
    else
      cmd += ' -codec:v copy'
    end
  end

  # Segment stuff
  cmd += ' -f segment -segment_time 60 -segment_wrap 10 -segment_list_flags'
  cmd += ' live -segment_list_size 10 -reset_timestamps 1'
  cmd += " -segment_list '#{camera[:id]}.csv' -y '#{camera[:id]}_%02d.mkv'"

  ## Examples:
  # Video with no sound
  # ffmpeg -rtsp_transport tcp -i rtsp://127.0.0.1:10152/proxyStream -dn -an -map 0 -codec:v copy -f segment -segment_time 60 -segment_wrap 10 -segment_list_flags live -segment_list_size 10 -reset_timestamps 1 -segment_list test.csv -y test_%02d.mkv

  # Video with ice1712, channel 4
  # ffmpeg -f alsa -ac 12 -i plug:ice1712 -rtsp_transport tcp -i rtsp://127.0.0.1:10152/proxyStream -dn -map 0:0 -map 1 -codec:a libfdk_aac -flags +qscale -global_quality 1 -afterburner 1 -ar 44100 -filter:a "aformat=channel_layouts=0xFFF,pan=1|c0=c3" -codec:v copy -f segment -segment_time 60 -segment_wrap 10 -segment_list_flags live -segment_list_size 10 -reset_timestamps 1 -segment_list test.csv -y test_%02d.mkv

  # Video with auto sound (from camera)
  # ffmpeg -rtsp_transport tcp -i rtsp://127.0.0.1:10152/proxyStream -dn -map 0 -codec:a libfdk_aac -flags +qscale -global_quality 1 -afterburner 1 -ar 44100 -codec:v copy -f segment -segment_time 60 -segment_wrap 10 -segment_list_flags live -segment_list_size 10 -reset_timestamps 1 -segment_list test.csv -y test_%02d.mkv

  default_process(:recorder, camera[:id])
  start_command cmd
  check :ctime, every: 1.minute, times: 2,
                file: File.join(PATHS[:tmp], "#{camera[:id]}.csv")
end

This worked before upgrading to latest trunk, but now the wait_for_camera method (as shown in the previous comment) does not seem to do anything.

Any ideas?

@tetherit
Copy link
Author

The eye.log shows this:

[2014-05-13 17:57:49 +0100] [Eye] => command: start
[2014-05-13 17:57:49 +0100] [Eye] <= command: start  (0.014613s)
[2014-05-13 17:58:02 +0100] [Eye] => command: start proxy
[2014-05-13 17:58:02 +0100] [recorder:52ab35a548616311d3360000:proxy] schedule :start (reason: start by user)
[2014-05-13 17:58:02 +0100] [recorder:52ab35a548616311d3360000:proxy] => start  (reason: start by user)
[2014-05-13 17:58:02 +0100] [recorder:52ab35a548616311d3360000:proxy] pid_file not found, starting...
[2014-05-13 17:58:02 +0100] [recorder:52ab35a548616311d3360000:proxy] switch :starting [:unmonitored => :starting] (reason: start by user)
[2014-05-13 17:58:02 +0100] [recorder:52ab35a548616311d3360000:proxy] daemonizing: `live555ProxyServer -p 10101 -V rtsp://username:password@192.168.88.13/media/video1` with start_grace: 2.5s, env: '', <39993> (in /Users/hackeron/Development/Xanview/xanAgent/tmp/recorder)

So as you can see the trigger :transition, to: :starting, do: -> { ... is not triggered anymore.

EDIT: looks like this line is ignored now too and no rtsp check is ran anymore: check :rtsp, every: 30.seconds, times: 2, addr: rtsp_url

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant