Skip to content

A Rails engine to reliably deliver loosely-ordered schematized events to Amazon Kinesis via Delayed::Job

License

Notifications You must be signed in to change notification settings

Betterment/journaled

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

86 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Journaled

A Rails engine to durably deliver schematized events to Amazon Kinesis via ActiveJob.

More specifically, journaled is composed of three opinionated pieces: schema definition/validation via JSON Schema, transactional enqueueing via ActiveJob (specifically, via a DB-backed queue adapter), and event transmission via Amazon Kinesis. Our current use-cases include transmitting audit events for durable storage in S3 and/or analytical querying in Amazon Redshift.

Journaled provides an at-least-once event delivery guarantee assuming ActiveJob's queue adapter is not configured to delete jobs on failure.

Note: Do not use the journaled gem to build an event sourcing solution as it does not guarantee total ordering of events. It's possible we'll add scoped ordering capability at a future date (and would gladly entertain pull requests), but it is presently only designed to provide a durable, eventually consistent record that discrete events happened.

See upgrades below if you're upgrading from an older journaled version!

Installation

  1. If you haven't already, configure ActiveJob to use one of the following queue adapters:

    • :delayed_job (via delayed_job_active_record)
    • :que
    • :good_job
    • :delayed

    Ensure that your queue adapter is not configured to delete jobs on failure.

    If you launch your application in production mode and the gem detects that ActiveJob::Base.queue_adapter is not in the above list, it will raise an exception and prevent your application from performing unsafe journaling.

  2. To integrate Journaled into your application, simply include the gem in your app's Gemfile.

    gem 'journaled'

    If you use rspec, add the following to your rails helper:

    # spec/rails_helper.rb
    
    # ... your other requires
    require 'journaled/rspec'
  3. You will need to set the following config in an initializer to allow Journaled to publish events to your AWS Kinesis event stream:

    Journaled.default_stream_name = "my_app_#{Rails.env}_events"

    You may also define a #journaled_stream_name method on Journaled::Event instances:

    def journaled_stream_name
      "my_app_#{Rails.env}_alternate_events"
    end
  4. You may also need to define environment variables to allow Journaled to publish events to your AWS Kinesis event stream:

    You may optionally define the following ENV vars to specify AWS credentials outside of the locations that the AWS SDK normally looks:

    • RUBY_AWS_ACCESS_KEY_ID
    • RUBY_AWS_SECRET_ACCESS_KEY

    You may also specify the region to target your AWS stream by setting AWS_DEFAULT_REGION. If you don't specify, Journaled will default to us-east-1.

    You may also specify a role that the Kinesis AWS client can assume:

    • JOURNALED_IAM_ROLE_ARN

    The AWS principal whose credentials are in the environment will need to be allowed to assume this role.

Usage

Configuration

Journaling provides a number of different configuation options that can be set in Ruby using an initializer. Those values are:

Journaled.default_stream_name

This is described in the "Installation" section above, and is used to specify which stream name to use.

Journaled.job_priority (default: 20)

This can be used to configure what priority the ActiveJobs are enqueued with. This will be applied to all the Journaled::DeliveryJobs that are created by this application. Ex: Journaled.job_priority = 14

Note that job priority is only supported on Rails 6.0+. Prior Rails versions will ignore this parameter and enqueue jobs with the underlying ActiveJob adapter's default priority.

Journaled.http_idle_timeout (default: 1 second)

The number of seconds a persistent connection is allowed to sit idle before it should no longer be used.

Journaled.http_open_timeout (default: 2 seconds)

The number of seconds before the :http_handler should timeout while trying to open a new HTTP session.

Journaled.http_read_timeout (default: 60 seconds)

The number of seconds before the :http_handler should timeout while waiting for a HTTP response.

ActiveJob set options

Both model-level directives accept additional options to be passed into ActiveJob's set method:

# For change journaling:
journal_changes_to :email, as: :identity_change, enqueue_with: { priority: 10 }

# For audit logging:
has_audit_log enqueue_with: { priority: 30 }

# Or for custom journaling:
journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }

Attribution

Before using Journaled::Changes or Journaled::AuditLog, you will want to set up automatic "actor" attribution (i.e. tracking the current user session). To enable this feature, add the following to your controller base class for attribution:

class ApplicationController < ActionController::Base
  include Journaled::Actor

  self.journaled_actor = :current_user # Or your authenticated entity
end

Your authenticated entity must respond to #to_global_id, which ActiveRecords do by default. This feature relies on ActiveSupport::CurrentAttributes under the hood.

Change Journaling with Journaled::Changes

Out of the box, Journaled provides an event type and ActiveRecord mix-in for durably journaling changes to your model, implemented via ActiveRecord hooks. Use it like so:

class User < ApplicationRecord
  include Journaled::Changes

  journal_changes_to :email, :first_name, :last_name, as: :identity_change
end

Every time any of the specified attributes is modified, or a User record is created or destroyed, an event will be sent to Kinesis with the following attributes:

  • id - a random event-specific UUID
  • event_type - the constant value journaled_change
  • created_at- when the event was created
  • table_name - the table name backing the ActiveRecord (e.g. users)
  • record_id - the primary key of the record, as a string (e.g. "300")
  • database_operation - one of create, update, delete
  • logical_operation - whatever logical operation you specified in your journal_changes_to declaration (e.g. identity_change)
  • changes - a serialized JSON object representing the latest values of any new or changed attributes from the specified set (e.g. {"email":"mynewemail@example.com"}). Upon destroy, all specified attributes will be serialized as they were last stored.
  • actor - a string (usually a rails global_id) representing who performed the action.

Callback-bypassing database methods like update_all, delete_all, update_columns and delete are intercepted and will require an additional force: true argument if they would interfere with change journaling. Note that the less-frequently-used methods toggle, increment*, decrement*, and update_counters are not intercepted at this time.

Audit Logging with Journaled::AuditLog

Journaled includes a feature for producing audit logs of changes to your model. Unlike Journaled::Changes, which will emit individual sets of changes as "logical" events, Journaled::AuditLog will log all changes in their entirety, unless otherwise told to ignore changes to specific columns.

This behavior is similar to papertrail, audited, and logidze, except instead of storing changes/versions locally (in your application's database), it emits them to Kinesis (as Journaled events).

Audit Log Configuration

To enable audit logging for a given record, use the has_audit_log directive:

class MyModel < ApplicationRecord
  has_audit_log

  # This class will now be audited,
  # but will ignore changes to `created_at` and `updated_at`.
end

To ignore changes to additional columns, use the ignore option:

class MyModel < ApplicationRecord
  has_audit_log ignore: :last_synced_at

  # This class will be audited,
  # and will ignore changes to `created_at`, `updated_at`, and `last_synced_at`.
end

By default, changes to updated_at and created_at will be ignored (since these generally change on every update), but this behavior can be reconfigured:

# change the defaults:
Journaled::AuditLog.default_ignored_columns = %i(createdAt updatedAt)

# or append new defaults:
Journaled::AuditLog.default_ignored_columns += %i(modified_at)

# or disable defaults entirely:
Journaled::AuditLog.default_ignored_columns = []

Subclasses will inherit audit log configs:

class MyModel < ApplicationRecord
  has_audit_log ignore: :last_synced_at
end

class MySubclass < MyModel
  # this class will be audited,
  # and will ignore `created_at`, `updated_at`, and `last_synced_at`.
end

To disable audit logs on subclasses, use skip_audit_log:

class MySubclass < MyModel
  skip_audit_log
end

Subclasses may specify additional columns to ignore (which will be merged into the inherited list):

class MySubclass < MyModel
  has_audit_log ignore: :another_field

  # this class will ignore `another_field`, IN ADDITION TO `created_at`, `updated_at`,
  # and any other fields specified by the parent class.
end

To temporarily disable audit logging globally, use the without_audit_logging directive:

Journaled::AuditLog.without_audit_logging do
  # Any operation in here will skip audit logging
end

Audit Log Events

Whenever an audited record is created, updated, or destroyed, a journaled_audit_log event is emitted. For example, calling user.update!(name: 'Bart') would result in an event that looks something like this:

{
  "id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47",
  "event_type": "journaled_audit_log",
  "created_at": "2022-01-28T11:06:54.928-05:00",
  "class_name": "User",
  "table_name": "users",
  "record_id": "123",
  "database_operation": "update",
  "changes": { "name": ["Homer", "Bart"] },
  "snapshot": null,
  "actor": "gid://app_name/AdminUser/456",
  "tags": {}
}

The field breakdown is as follows:

  • id: a randomly-generated ID for the event itself
  • event_type: the type of event (always journaled_audit_log)
  • created_at: the time that the action occurred (should match updated_at on the ActiveRecord)
  • class_name: the name of the ActiveRecord class
  • table_name: the underlying table that the class interfaces with
  • record_id: the primary key of the ActiveRecord
  • database_operation: the type of operation (insert, update, or delete)
  • changes: the changes to the record, in the form of "field_name": ["from_value", "to_value"]
  • snapshot: an (optional) snapshot of all of the record's columns and their values (see below).
  • actor: the current Journaled.actor
  • tags: the current Journaled.tags

Snapshots

When records are created, updated, and deleted, the changes field is populated with only the columns that changed. While this keeps event payload size down, it may make it harder to reconstruct the state of the record at a given point in time.

This is where the snapshot field comes in! To produce a full snapshot of a record as part of an update, set use the virtual _log_snapshot attribute, like so:

my_user.update!(name: 'Bart', _log_snapshot: true)

Or to produce snapshots for all records that change for a given operation, wrap it a with_snapshots block, like so:

Journaled::AuditLog.with_snapshots do
  ComplicatedOperation.run!
end

Snapshots can also be enabled globally for all deletion operations. Since changes will be empty on deletion, you should consider using this if you care about the contents of any records being deleted (and/or don't have a full audit trail from their time of creation):

Journaled::AuditLog.snapshot_on_deletion = true

Events with snapshots will continue to populate the changes field, but will additionally contain a snapshot with the full state of the user:

{
  "...": "...",
  "changes": { "name": ["Homer", "Bart"] },
  "snapshot": { "name": "Bart", "email": "simpson@example.com", "favorite_food": "pizza" },
  "...": "..."
}

Handling Sensitive Data

Both changes and snapshot will filter out sensitive fields, as defined by your Rails.application.config.filter_parameters list:

{
  "...": "...",
  "changes": { "ssn": ["[FILTERED]", "[FILTERED]"] },
  "snapshot": { "ssn": "[FILTERED]" },
  "...": "..."
}

They will also filter out any fields whose name ends in _crypt or _hmac, as well as fields that rely on Active Record Encryption / encrypts (introduced in Rails 7).

This is done to avoid emitting values to locations where it is difficult or impossible to rotate encryption keys (or otherwise scrub values after the fact), and currently there is no built-in configuration to bypass this behavior. If you need to track changes to sensitive/encrypted fields, it is recommended that you store the values in a local history table (still encrypted, of course!).

Caveats

Because Journaled events are not guaranteed to arrive in order, events emitted by Journaled::AuditLog must be sorted by their created_at value, which should correspond roughly to the time that the SQL statement was issued. There is currently no other means of globally ordering audit log events, making them susceptible to clock drift and race conditions.

These issues may be mitigated on a per-model basis via ActiveRecord::Locking::Optimistic (and its auto-incrementing lock_version column), and/or by careful use of other locking mechanisms.

Custom Journaling

For every custom implementation of journaling in your application, define the JSON schema for the attributes in your event. This schema file should live in your Rails application at the top level and should be named in snake case to match the class being journaled. E.g.: your_app/journaled_schemas/my_class.json)

In each class you intend to use Journaled, include the Journaled::Event module and define the attributes you want captured. After completing the above steps, you can call the journal! method in the model code and the declared attributes will be published to the Kinesis stream. Be sure to call journal! within the same transaction as any database side effects of your business logic operation to ensure that the event will eventually be delivered if-and-only-if your transaction commits.

Example:

// journaled_schemas/contract_acceptance_event.json

{
  "type": "object",
  "title": "contract_acceptance_event",
  "required": [
    "user_id",
    "signature"
  ],
  "properties": {
    "user_id": {
      "type": "integer"
    },
    "signature": {
      "type": "string"
    }
  }
}
# app/models/contract_acceptance_event.rb

ContractAcceptanceEvent = Struct.new(:user_id, :signature) do
  include Journaled::Event

  journal_attributes :user_id, :signature
end
# app/models/contract_acceptance.rb

class ContractAcceptance
  include ActiveModel::Model

  attr_accessor :user_id, :signature

  def user
    @user ||= User.find(user_id)
  end

  def contract_acceptance_event
    @contract_acceptance_event ||= ContractAcceptanceEvent.new(user_id, signature)
  end

  def save!
    User.transaction do
      user.update!(contract_accepted: true)
      contract_acceptance_event.journal!
    end
  end
end

An event like the following will be journaled to kinesis:

{
  "id": "bc7cb6a6-88cf-4849-a4f0-a31b0b199c47", // A random event ID for idempotency filtering
  "event_type": "contract_acceptance_event",
  "created_at": "2019-01-28T11:06:54.928-05:00",
  "user_id": 123,
  "signature": "Sarah T. User"
}

Tagged Events

Events may be optionally marked as "tagged." This will add a tags field, intended for tracing and auditing purposes.

class MyEvent
  include Journaled::Event

  journal_attributes :attr_1, :attr_2, tagged: true
end

You may then use Journaled.tag! and Journaled.tagged inside of your ApplicationController and ApplicationJob classes (or anywhere else!) to tag all events with request and job metadata:

class ApplicationController < ActionController::Base
  before_action do
    Journaled.tag!(request_id: request.request_id, current_user_id: current_user&.id)
  end
end

class ApplicationJob < ActiveJob::Base
  around_perform do |job, perform|
    Journaled.tagged(job_id: job.id) { perform.call }
  end
end

This feature relies on ActiveSupport::CurrentAttributes under the hood, so these tags are local to the current thread, and will be cleared at the end of each request request/job.

Helper methods for custom events

Journaled provides a couple helper methods that may be useful in your custom events. You can add whichever you need your event types like this:

# my_event.rb
class MyEvent
  include Journaled::Event

  journal_attributes :commit_hash, :actor_uri # ... etc, etc

  def commit_hash
    Journaled.commit_hash
  end

  def actor_uri
    Journaled.actor_uri
  end

  # ... etc, etc
end

Journaled.commit_hash

If you choose to use it, you must provide a GIT_COMMIT environment variable. Journaled.commit_hash will fail if it is undefined.

Journaled.actor_uri

Returns one of the following in order of preference:

  • The current controller-defined journaled_actor's GlobalID, if set
  • A string of the form gid://[app_name]/[os_username] if performed on the command line
  • a string of the form gid://[app_name] as a fallback

In order for this to be most useful, you must configure your controller as described in Attribution above.

Testing

If you use RSpec, you can test for journaling behaviors with the journal_event(s)_including and journal_changes_to matchers. First, make sure to require journaled/rspec in your spec setup (e.g. spec/rails_helper.rb):

require 'journaled/rspec'

Checking for specific events

The journal_event_including and journal_events_including matchers allow you to check for one or more matching event being journaled:

expect { my_code }
  .to journal_event_including(name: 'foo')
expect { my_code }
  .to journal_events_including({ name: 'foo', value: 1 }, { name: 'foo', value: 2 })

This will only perform matches on the specified fields (and will not match one way or the other against unspecified fields). These matchers will also ignore any extraneous events that are not positively matched (as they may be unrelated to behavior under test).

When writing tests, pairing every positive assertion with a negative assertion is a good practice, and so negative matching is also supported (via both .not_to and .to not_):

expect { my_code }
  .not_to journal_events_including({ name: 'foo' }, { name: 'bar' })
expect { my_code }
  .to raise_error(SomeError)
  .and not_journal_event_including(name: 'foo') # the `not_` variant can chain off of `.and`

Several chainable modifiers are also available:

expect { my_code }.to journal_event_including(name: 'foo')
  .with_schema_name('my_event_schema')
  .with_partition_key(user.id)
  .with_stream_name('my_stream_name')
  .with_enqueue_opts(run_at: future_time)
  .with_priority(999)

All of this can be chained together to test for multiple sets of events with multiple sets of options:

expect { subject.journal! }
  .to journal_events_including({ name: 'event1', value: 300 }, { name: 'event2', value: 200 })
    .with_priority(10)
  .and journal_event_including(name: 'event3', value: 100)
    .with_priority(20)
  .and not_journal_event_including(name: 'other_event')

Checking for Journaled::Changes declarations

The journal_changes_to matcher checks against the list of attributes specified on the model. It does not actually test that an event is emitted within a given codepath, and is instead intended to guard against accidental regressions that may impact external consumers of these events:

it "journals exactly these things or there will be heck to pay" do
  expect(User).to journal_changes_to(:email, :first_name, :last_name, as: :identity_change)
end

Instrumentation

When an event is enqueued, an ActiveSupport::Notification titled journaled.event.enqueue is emitted. Its payload will include the :event and its background job :priority.

This can be forwarded along to your preferred monitoring solution via a Rails initializer:

ActiveSupport::Notifications.subscribe('journaled.event.enqueue') do |*args|
  payload = ActiveSupport::Notifications::Event.new(*args).payload
  journaled_event = payload[:event]

  tags = { priority: payload[:priority], event_type: journaled_event.journaled_attributes[:event_type] }

  Statsd.increment('journaled.event.enqueue', tags: tags.map { |k,v| "#{k.to_s[0..64]}:#{v.to_s[0..255]}" })
end

Upgrades

Since this gem relies on background jobs (which can remain in the queue across code releases), this gem generally aims to support jobs enqueued by the prior gem version.

As such, we always recommend upgrading only one major version at a time.

Upgrading from 4.3.0

Versions of Journaled prior to 5.0 would enqueue events one at a time, but 5.0 introduces a new transaction-aware feature that will bundle up all events emitted within a transaction and enqueue them all in a single "batch" job directly before the SQL COMMIT statement. This reduces the database impact of emitting a large volume of events at once.

This feature can be disabled conditionally:

Journaled.transactional_batching_enabled = false

And can then be enabled via the following block:

Journaled.with_transactional_batching do
  # your code
end

Backwards compatibility has been included for background jobs enqueued by version 4.0 and above, but has been dropped for jobs emitted by versions prior to 4.0. (Again, be sure to upgrade only one major version at a time.)

Upgrading from 3.1.0

Versions of Journaled prior to 4.0 relied directly on environment variables for stream names, but now stream names are configured directly. When upgrading, you can use the following configuration to maintain the previous behavior:

Journaled.default_stream_name = ENV['JOURNALED_STREAM_NAME']

If you previously specified a Journaled.default_app_name, you would have required a more precise environment variable name (substitute {{upcase_app_name}}):

Journaled.default_stream_name = ENV["{{upcase_app_name}}_JOURNALED_STREAM_NAME"]

And if you had defined any journaled_app_name methods on Journaled::Event instances, you can replace them with the following:

def journaled_stream_name
  ENV['{{upcase_app_name}}_JOURNALED_STREAM_NAME']
end

When upgrading from 3.1 or below, Journaled::DeliveryJob will handle any jobs that remain in the queue by accepting an app_name argument. This behavior will be removed in version 5.0, so it is recommended to upgrade one major version at a time.

Upgrading from 2.5.0

Versions of Journaled prior to 3.0 relied direclty on delayed_job and a "performable" class called Journaled::Delivery. In 3.0, this was superceded by an ActiveJob class called Journaled::DeliveryJob, but the Journaled::Delivery class was not removed until 4.0.

Therefore, when upgrading from 2.5.0 or below, it is recommended to first upgrade to 3.1.0 (to allow any Journaled::Delivery jobs to finish working off) before upgrading to 4.0+.

The upgrade to 3.1.0 will require a working ActiveJob config. ActiveJob can be configured globally by setting ActiveJob::Base.queue_adapter, or just for Journaled jobs by setting Journaled::DeliveryJob.queue_adapter. The :delayed_job queue adapter will allow you to continue relying on delayed_job. You may also consider switching your app(s) to delayed and using the :delayed queue adapter.

Future improvements & issue tracking

Suggestions for enhancements to this engine are currently being tracked via Github Issues. Please feel free to open an issue for a desired feature, as well as for any observed bugs.

About

A Rails engine to reliably deliver loosely-ordered schematized events to Amazon Kinesis via Delayed::Job

Resources

License

Stars

Watchers

Forks

Packages

No packages published