-
Notifications
You must be signed in to change notification settings - Fork 737
Connecting Databus to an Oracle Database
The main purpose for databusification on the Oracle side is to provide a mechanism to get a timeline-consistent stream of changes to selected tables. Some requirements and non-requirements are:
- The changes should be replayable in the same order as the database has applied them. This ensures that the end image is the same by applying the changes to some point as it is on the database.
- If multiple changes occur on a row in a certain time period, it is not necessary to get each intermediate change. As long as the last image as of a certain point (SCN in this case) is available, it is good enough.
Each Oracle table has a pseudo-column called ora_rowscn
that contains the highest SCN that a row was modified at. By default Oracle maintains it at block-level, but by setting rowdependencies
to be on, it can be maintained at row-level.
This section lists the main artifacts that are relevant to the flow. There are other artifacts, as well, which are needed for the overall functioning but are not critical for the high-level understanding.
- Each source table has a
txn
column added to it. Say source tablefoo
originally has columns (A,B,C). For databusification atxn
column needs to be added to the base table. This column is indexed. - A
sy$txlog
table is created per database. This keeps track of transactional changes to each databusified source in the database. Its main columns are(scn, txn, mask, timestamp)
. - There is a before insert/update trigger on each source that does 2 things:
- Uses
sync_core.getTxn()
to get the current transaction ID and inserts it into the Source tabletxn
column. - Inserts or Updates a row in
sy$txlog
withTxn
set to new txnid,scn
initialized to Infinity (9999999
), and a new mask.
- Uses
- There is a coalesce job that runs in the background every N secs (currently 2 seconds). It updates the rows in
sy$txlog
that have scn=Infinity toora_rowscn
.
This is the sequence of steps that occurs from the time a transaction is started to when it is committed, and afterward.
- Txn T1 is started and makes an update to row R1 in Source S1
- A before update trigger on S1 causes the following to happen:
-
sync_core.gettxn()
is invoked and the obtained txn id (T1) is updated to the txn column of R1 - a new row R2 is added to
sy$txlog
with scn=INFINITY, tx =T1, mask set to indicate S1 is updated.
-
- Txn T1 commits. At this point
ora_rowscn
in S1 is updated andora_rowscn
insy$txlog
’s new row R2 is updated to the commit SCN of the txn. SCN column of R2 is still INFINITY. - Coalesce job kicks after N seconds. It queries for rows in
sy$txlog
with SCN==INFINITY and updates SCN=ora_rowscn. Therefore,R2.scn
becomes equal toora_rowscn
.
No. Event stream has to be ordered by SCN. Ordering by Txn might lead to an incorrect end state. The reason is that the Txn column is populated at the beginning of a transaction. The SCN column is based on ora_rowscn
which is assigned at commit time. The correct ordering of updates is based on commit time.
The ora_rowscn
column cannot be indexed. Therefore the indexable scn
column is needed. The background coalesce job keeps this column updated with the ora_rowscn
value.
Technically each table (and hence source table also) has an ora_rowscn
column. In order for the ora_rowscn
to be accurate at row level, rowdependencies
has to be set at table level. This is an expensive setting. Therefore, instead of using it on each source table, this setting is used only on the sy$txlog
table. By sharing the update of the txn
column between the source table and sy$txlog
table, it is ensured that the ora_rowscn
of the sy$txlog
row is updated at the same time as the transaction is committed.
- Description: Generates txid numbers for
sy$txlog
- Defined in:
database/*/createSchema/schema/cdsddl.sqs
- Description: Maintains a persistent mapping of a databus source to a bit number.
- Defined in:
database/*/createSchema/schema/cdsddl.tab
- A maximum of 126 sources are supported because the sources bitmask has to fit in a Number.
Column | Description |
---|---|
name | the name of the Databus source |
bitnum | the bit number corresponding to this source in the sy$txlog mask column |
- Constraints
-
sy$sources_pk
- PK constraint on name; declared indatabase/*/createSchema/schema/cdsddl.con
-
- Indexes
-
sy$sources_I1
- secondary index on bitnum; declared indatabase/*/createSchema/schema/cdsddl.ind
-
- Description: Log of changes to databus sources. The table is updated by the UPDATE/INSERT trigger of a databus source
- Defined in:
database/*/createSchema/schema/cdsddl.tab
- Misc
- rowdependencies enabled
Column | Description |
txn | transaction id (generated from sy$scn_req ) |
scn | System Change Number; the ora_rowscn of the first change in the transaction |
mask | a mask with the bits of all source that got updated in the transaction; see also sy$sources
|
ts | the transaction timestamp |
- Constraints
-
sy$txlog_pk
- PK constraint on txn; declared indatabase/*/createSchema/schema/cdsddl.con
-
- Indexes
-
sy$txlog_I1
- secondary index on scn; declared indatabase/*/createSchema/schema/cdsddl.ind
-
- Description: Maintains settings for databus
- Defined in:
database/*/createSchema/schema/cdsddl.tab
Column | Description |
RAISE_DBMS_ALERTS | if ‘N’, conditional alerts by signal_beep are disabled |
- Description: Defines a view over a table to be exported through databus
- Defined in:
database/*/createSchema/schema/cdsddl.vw
create or replace force view sy$T as select txn, f1, f2, : : from T
- Description: Core databus methods
- Defined in:
database/*/createSchema/schema/cdsddl.prc1
Variable name | Description |
lastTxID | the DBMS_TRANSACTION.LOCAL_TRANSACTION_ID of the lastest transaction; used in getTxn() to determine transaction boundaries |
currentTxn | the txn column value in sy$txlog for the last updated/inserted row |
currentMask | the mask column value in sy$txlog for the last updated/inserted row |
source_bits | map from a databus source name to a source bitmask |
Method prototype | Description |
function getScn(v_scn in number, v_ora_rowscn in number) return number | Implements infinity == v_scn ? v_ora_rowscn : v_scn |
function getMask(source in varchar) return number | Returns the bitmask corresponding to a databus source name. |
function getTxn(source in varchar) return number | Denotes that source is being updated by a transaction. Updates sy$txlog accordingly. Returns the txn of the transaction. |
procedure coalesce_log | Updates all records in sy$txlog for which the scn field is not set to the corresponding ora_rowscn value. This is needed because the ora_rowscn is generated at transaction commit time, i.e., after the row in sy$txlog has been added. |
procedure signal_beep | Raises a sy$alert alert if RAISE_DBMS_ALERTS in sync_core_settings is set to ‘Y’. |
unconditional_signal_beep | Raises a sy$alert alert. |
- Description: Routines for managing alerts from databus sources
- Defined in:
database/*/createSchema/schema/cdsddl.prc
Variable name | Description |
is_registered | Boolean flag: whether the sy$alert alert is registered |
Method prototype | Description |
function registerSourceWithVersion(source in varchar, version in number) return number | Registers a source with a version: after registration, all events occurring on this source will be returned by waitForEvent ; returns null if the source does not exist (otherwise returns the source) |
procedure unregisterAllSources | Unregisters all sources. After this call, no more events are returned by waitForEvent . |
function waitForEvent(maxWait in number) return varchar | Wait for an event no longer than the specified timeout (in seconds). Returns the message that was associated with the event. |
- Description: Runs
sync_core.coalesce_log
every 2 seconds. - Defined in:
database/*/createSchema/schema/cdsddl.prc
- Description: Runs
sync_core.unconditional_signal_beep
every second. - Defined in:
database/*/createSchema/schema/cdsddl.prc
T is the name of the databus source.
CREATE TRIGGER T_DATABUS_TRG before insert or update on T referencing old as old new as new for each row begin if (updating and :new.txn < 0) then :new.txn := -:new.txn; else :new.txn := sync_core.getTxn('T'); end if; end;
- Configuration settings: chunkedScnThreshold == -1, useRowChunking = N/A
- Query parameters
-
sinceScn
- the last SCN read from the relay/client
-
SELECT /*+ first_rows LEADING(tx) */ sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, src.* FROM sy$<SOURCE_VIEW_NAME> src, sy$txlog tx WHERE src.txn=tx.txn AND tx.scn > :sinceScn AND tx.ora_rowscn > :sinceScn
- Configuration settings: chunkedScnThreshold > 0, useRowChunking = true
- Query parameters
-
sinceScn
- the last SCN read from the relay/client -
rowsPerChunk
- chunk size (insy$txlog
rows)
-
SELECT scn, event_timestamp, src.* FROM sy$<SOURCE_VIEW_NAME> src, ( SELECT /*+ first_rows LEADING(tx) */ sync_core.getScn(tx.scn, tx.ora_rowscn) scn, tx.ts event_timestamp, tx.txn, row_number() OVER (ORDER BY TX.SCN) r FROM sy$txlog tx "); WHERE tx.scn > :sinceScn AND tx.ora_rowscn > :sinceScn AND tx.scn < 9999999999999999999999999999) t WHERE src.txn = t.txn AND r<= :rowsPerChunk ORDER BY r
- SCN/ORA_ROWSCN/ROWDEPENDENCIES
- Oracle alerts
- Oracle scheduler