diff --git a/src/include/firebird/FirebirdInterface.idl b/src/include/firebird/FirebirdInterface.idl index 9f87cc1126d..c2d067d6114 100644 --- a/src/include/firebird/FirebirdInterface.idl +++ b/src/include/firebird/FirebirdInterface.idl @@ -616,6 +616,9 @@ interface Replicator : ReferenceCounted version: // 4.0.0 => 4.0.1 [notImplementedAction if ::FB_UsedInYValve then defaultAction else call deprecatedClose(status) endif] void close(Status status); + +version: // 4.0 => 6.0 + void init(Status status, const string guid); } interface Request : ReferenceCounted diff --git a/src/include/firebird/IdlFbInterfaces.h b/src/include/firebird/IdlFbInterfaces.h index 50d5f9dd404..991670912fd 100644 --- a/src/include/firebird/IdlFbInterfaces.h +++ b/src/include/firebird/IdlFbInterfaces.h @@ -2322,7 +2322,7 @@ namespace Firebird } }; -#define FIREBIRD_IREPLICATOR_VERSION 4u +#define FIREBIRD_IREPLICATOR_VERSION 5u class IReplicator : public IReferenceCounted { @@ -2332,6 +2332,7 @@ namespace Firebird void (CLOOP_CARG *process)(IReplicator* self, IStatus* status, unsigned length, const unsigned char* data) CLOOP_NOEXCEPT; void (CLOOP_CARG *deprecatedClose)(IReplicator* self, IStatus* status) CLOOP_NOEXCEPT; void (CLOOP_CARG *close)(IReplicator* self, IStatus* status) CLOOP_NOEXCEPT; + void (CLOOP_CARG *init)(IReplicator* self, IStatus* status, const char* guid) CLOOP_NOEXCEPT; }; protected: @@ -2378,6 +2379,19 @@ namespace Firebird static_cast(this->cloopVTable)->close(this, status); StatusType::checkException(status); } + + template void init(StatusType* status, const char* guid) + { + if (cloopVTable->version < 5) + { + StatusType::setVersionError(status, "IReplicator", cloopVTable->version, 5); + StatusType::checkException(status); + return; + } + StatusType::clearException(status); + static_cast(this->cloopVTable)->init(this, status, guid); + StatusType::checkException(status); + } }; #define FIREBIRD_IREQUEST_VERSION 4u @@ -11362,6 +11376,7 @@ namespace Firebird this->process = &Name::cloopprocessDispatcher; this->deprecatedClose = &Name::cloopdeprecatedCloseDispatcher; this->close = &Name::cloopcloseDispatcher; + this->init = &Name::cloopinitDispatcher; } } vTable; @@ -11410,6 +11425,20 @@ namespace Firebird } } + static void CLOOP_CARG cloopinitDispatcher(IReplicator* self, IStatus* status, const char* guid) CLOOP_NOEXCEPT + { + StatusType status2(status); + + try + { + static_cast(self)->Name::init(&status2, guid); + } + catch (...) + { + StatusType::catchException(&status2); + } + } + static void CLOOP_CARG cloopaddRefDispatcher(IReferenceCounted* self) CLOOP_NOEXCEPT { try @@ -11452,6 +11481,7 @@ namespace Firebird virtual void process(StatusType* status, unsigned length, const unsigned char* data) = 0; virtual void deprecatedClose(StatusType* status) = 0; virtual void close(StatusType* status) = 0; + virtual void init(StatusType* status, const char* guid) = 0; }; template diff --git a/src/include/gen/Firebird.pas b/src/include/gen/Firebird.pas index 7c8ae81ce37..6e233303c63 100644 --- a/src/include/gen/Firebird.pas +++ b/src/include/gen/Firebird.pas @@ -380,6 +380,7 @@ ISC_TIMESTAMP_TZ_EX = record IReplicator_processPtr = procedure(this: IReplicator; status: IStatus; length: Cardinal; data: BytePtr); cdecl; IReplicator_deprecatedClosePtr = procedure(this: IReplicator; status: IStatus); cdecl; IReplicator_closePtr = procedure(this: IReplicator; status: IStatus); cdecl; + IReplicator_initPtr = procedure(this: IReplicator; status: IStatus; guid: PAnsiChar); cdecl; IRequest_receivePtr = procedure(this: IRequest; status: IStatus; level: Integer; msgType: Cardinal; length: Cardinal; message: Pointer); cdecl; IRequest_sendPtr = procedure(this: IRequest; status: IStatus; level: Integer; msgType: Cardinal; length: Cardinal; message: Pointer); cdecl; IRequest_getInfoPtr = procedure(this: IRequest; status: IStatus; level: Integer; itemsLength: Cardinal; items: BytePtr; bufferLength: Cardinal; buffer: BytePtr); cdecl; @@ -1699,14 +1700,16 @@ ReplicatorVTable = class(ReferenceCountedVTable) process: IReplicator_processPtr; deprecatedClose: IReplicator_deprecatedClosePtr; close: IReplicator_closePtr; + init: IReplicator_initPtr; end; IReplicator = class(IReferenceCounted) - const VERSION = 4; + const VERSION = 5; procedure process(status: IStatus; length: Cardinal; data: BytePtr); procedure deprecatedClose(status: IStatus); procedure close(status: IStatus); + procedure init(status: IStatus; guid: PAnsiChar); end; IReplicatorImpl = class(IReplicator) @@ -1717,6 +1720,7 @@ IReplicatorImpl = class(IReplicator) procedure process(status: IStatus; length: Cardinal; data: BytePtr); virtual; abstract; procedure deprecatedClose(status: IStatus); virtual; abstract; procedure close(status: IStatus); virtual; abstract; + procedure init(status: IStatus; guid: PAnsiChar); virtual; abstract; end; RequestVTable = class(ReferenceCountedVTable) @@ -7429,6 +7433,17 @@ procedure IReplicator.close(status: IStatus); FbException.checkException(status); end; +procedure IReplicator.init(status: IStatus; guid: PAnsiChar); +begin + if (vTable.version < 5) then begin + FbException.setVersionError(status, 'IReplicator', vTable.version, 5); + end + else begin + ReplicatorVTable(vTable).init(Self, status, guid); + end; + FbException.checkException(status); +end; + procedure IRequest.receive(status: IStatus; level: Integer; msgType: Cardinal; length: Cardinal; message: Pointer); begin RequestVTable(vTable).receive(Self, status, level, msgType, length, message); @@ -11986,6 +12001,15 @@ procedure IReplicatorImpl_closeDispatcher(this: IReplicator; status: IStatus); c end end; +procedure IReplicatorImpl_initDispatcher(this: IReplicator; status: IStatus; guid: PAnsiChar); cdecl; +begin + try + IReplicatorImpl(this).init(status, guid); + except + on e: Exception do FbException.catchException(status, e); + end +end; + var IReplicatorImpl_vTable: ReplicatorVTable; @@ -17576,12 +17600,13 @@ initialization IBatchCompletionStateImpl_vTable.getStatus := @IBatchCompletionStateImpl_getStatusDispatcher; IReplicatorImpl_vTable := ReplicatorVTable.create; - IReplicatorImpl_vTable.version := 4; + IReplicatorImpl_vTable.version := 5; IReplicatorImpl_vTable.addRef := @IReplicatorImpl_addRefDispatcher; IReplicatorImpl_vTable.release := @IReplicatorImpl_releaseDispatcher; IReplicatorImpl_vTable.process := @IReplicatorImpl_processDispatcher; IReplicatorImpl_vTable.deprecatedClose := @IReplicatorImpl_deprecatedCloseDispatcher; IReplicatorImpl_vTable.close := @IReplicatorImpl_closeDispatcher; + IReplicatorImpl_vTable.init := @IReplicatorImpl_initDispatcher; IRequestImpl_vTable := RequestVTable.create; IRequestImpl_vTable.version := 4; diff --git a/src/jrd/EngineInterface.h b/src/jrd/EngineInterface.h index 3a2adf4fcc6..7608c05c0a1 100644 --- a/src/jrd/EngineInterface.h +++ b/src/jrd/EngineInterface.h @@ -251,6 +251,7 @@ class JReplicator final : void process(Firebird::CheckStatusWrapper* status, unsigned length, const unsigned char* data) override; void close(Firebird::CheckStatusWrapper* status) override; void deprecatedClose(Firebird::CheckStatusWrapper* status) override; + void init(Firebird::CheckStatusWrapper* status, const char* guid) override; public: JReplicator(Applier* appl, StableAttachmentPart* sa); diff --git a/src/jrd/jrd.cpp b/src/jrd/jrd.cpp index e62d0e3a70b..5e7df8a39c9 100644 --- a/src/jrd/jrd.cpp +++ b/src/jrd/jrd.cpp @@ -6606,6 +6606,35 @@ void JReplicator::freeEngineData(Firebird::CheckStatusWrapper* user_status) } +void JReplicator::init(CheckStatusWrapper* status, const char* guid) +{ + try + { + EngineContextHolder tdbb(status, this, FB_FUNCTION); + check_database(tdbb); + + try + { + applier->init(tdbb, guid); + } + catch (const Exception& ex) + { + transliterateException(tdbb, ex, status, "JReplicator::init"); + return; + } + + trace_warning(tdbb, status, "JReplicator::init"); + } + catch (const Exception& ex) + { + ex.stuffException(status); + return; + } + + successful_completion(status); +} + + void JReplicator::process(CheckStatusWrapper* status, unsigned length, const UCHAR* data) { try diff --git a/src/jrd/replication/Applier.cpp b/src/jrd/replication/Applier.cpp index c9fae3bf2ea..9ce9a8ce15b 100644 --- a/src/jrd/replication/Applier.cpp +++ b/src/jrd/replication/Applier.cpp @@ -225,6 +225,22 @@ namespace } // namespace +void Applier::init(thread_db* tdbb, const char* guidStr) +{ + if (!guidStr) + raiseError("Source GUID is invalid"); + + const auto guid = Guid::fromString(guidStr); + + if (!guid) + raiseError("Source GUID is invalid"); + + if (m_sourceGuid && guid.value() != m_sourceGuid.value()) + raiseError("Source GUID mismatch"); + + m_currentGuid = guid; +} + Applier* Applier::create(thread_db* tdbb) { const auto dbb = tdbb->getDatabase(); @@ -260,10 +276,9 @@ Applier* Applier::create(thread_db* tdbb) } const auto config = dbb->replConfig(); - const bool cascade = (config && config->cascadeReplication); const auto applier = FB_NEW_POOL(*attachment->att_pool) - Applier(*attachment->att_pool, dbb->dbb_filename, request, cascade); + Applier(*attachment->att_pool, dbb->dbb_filename, config, request); attachment->att_repl_appliers.add(applier); diff --git a/src/jrd/replication/Applier.h b/src/jrd/replication/Applier.h index f549ca6e72a..fd2850eabef 100644 --- a/src/jrd/replication/Applier.h +++ b/src/jrd/replication/Applier.h @@ -125,14 +125,18 @@ namespace Jrd public: Applier(Firebird::MemoryPool& pool, const Firebird::PathName& database, - Request* request, bool cascade) + const Replication::Config* config, + Request* request) : PermanentStorage(pool), m_txnMap(pool), m_database(pool, database), - m_request(request), m_enableCascade(cascade) + m_request(request), + m_sourceGuid(config ? config->sourceGuid : std::nullopt), + m_enableCascade(config ? config->cascadeReplication : false) {} static Applier* create(thread_db* tdbb); + void init(thread_db* tdbb, const char* guidStr); void process(thread_db* tdbb, ULONG length, const UCHAR* data); void cleanupTransactions(thread_db* tdbb); void shutdown(thread_db* tdbb); @@ -155,6 +159,8 @@ namespace Jrd Record* m_record = nullptr; JReplicator* m_interface; const bool m_enableCascade; + const std::optional m_sourceGuid; + std::optional m_currentGuid; void startTransaction(thread_db* tdbb, TraNumber traNum); void prepareTransaction(thread_db* tdbb, TraNumber traNum); diff --git a/src/jrd/replication/Manager.cpp b/src/jrd/replication/Manager.cpp index 02cb061473b..3d262360fb7 100644 --- a/src/jrd/replication/Manager.cpp +++ b/src/jrd/replication/Manager.cpp @@ -151,10 +151,26 @@ Manager::Manager(const string& dbId, if (localStatus->getState() & IStatus::STATE_ERRORS) { logPrimaryStatus(m_config->dbName, &localStatus); - attachment->detach(&localStatus); + attachment->release(); continue; } + replicator->init(&localStatus, guid.toString().c_str()); + if (localStatus->getState() & IStatus::STATE_ERRORS) + { + const auto errorCode = localStatus->getErrors()[1]; + + if (errorCode != isc_interface_version_too_old && errorCode != isc_wish_list) + { + logPrimaryStatus(m_config->dbName, &localStatus); + replicator->release(); + attachment->release(); + continue; + } + + localStatus->init(); + } + m_replicas.add(FB_NEW_POOL(getPool()) SyncReplica(getPool(), attachment, replicator)); } diff --git a/src/jrd/replication/Manager.h b/src/jrd/replication/Manager.h index d38a3d4883e..b4ebab6a3bf 100644 --- a/src/jrd/replication/Manager.h +++ b/src/jrd/replication/Manager.h @@ -103,6 +103,7 @@ namespace Replication Firebird::Semaphore m_workingSemaphore; const Replication::Config* const m_config; + std::optional m_guid; Firebird::Array m_replicas; Firebird::Array m_buffers; Firebird::Mutex m_buffersMutex; diff --git a/src/remote/client/interface.cpp b/src/remote/client/interface.cpp index 7cd174ef99f..549e65e4dc2 100644 --- a/src/remote/client/interface.cpp +++ b/src/remote/client/interface.cpp @@ -623,6 +623,7 @@ class Replicator final : public RefCntIfacegetRdb(); + CHECK_HANDLE(rdb, isc_bad_db_handle); + rem_port* port = rdb->rdb_port; + + if (port->port_protocol < PROTOCOL_VERSION20) + unsupported(); + + PACKET* packet = &rdb->rdb_packet; + packet->p_operation = op_repl_init; + P_REPLICATE* repl = &packet->p_replicate; + repl->p_repl_database = rdb->rdb_id; + repl->p_repl_data.cstr_length = strlen(guid); + repl->p_repl_data.cstr_address = reinterpret_cast(guid); + + RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + + send_and_receive(status, rdb, packet); + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} + + void Replicator::process(CheckStatusWrapper* status, unsigned length, const unsigned char* data) { try diff --git a/src/remote/protocol.cpp b/src/remote/protocol.cpp index 414971d6e26..646dc39ce79 100644 --- a/src/remote/protocol.cpp +++ b/src/remote/protocol.cpp @@ -1136,6 +1136,7 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) } case op_repl_data: + case op_repl_init: { P_REPLICATE* repl = &p->p_replicate; MAP(xdr_short, reinterpret_cast(repl->p_repl_database)); diff --git a/src/remote/protocol.h b/src/remote/protocol.h index 4774e9c962d..aeefe256fb6 100644 --- a/src/remote/protocol.h +++ b/src/remote/protocol.h @@ -325,7 +325,7 @@ enum P_OP op_batch_set_bpb = 106, op_repl_data = 107, - op_repl_req = 108, + op_repl_init = 108, op_batch_cancel = 109, op_batch_sync = 110, diff --git a/src/remote/remote.h b/src/remote/remote.h index 57ab8108c00..c23a47e51bd 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -1631,7 +1631,7 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted void batch_cancel(P_BATCH_FREE_CANCEL*, PACKET*); void batch_sync(PACKET*); void batch_bpb(P_BATCH_SETBPB*, PACKET*); - void replicate(P_REPLICATE*, PACKET*); + void replicate(P_REPLICATE*, PACKET*, bool); Firebird::string getRemoteId() const; void auxAcceptError(PACKET* packet); diff --git a/src/remote/server/ReplServer.cpp b/src/remote/server/ReplServer.cpp index 3b12feee393..f50a22f6b54 100644 --- a/src/remote/server/ReplServer.cpp +++ b/src/remote/server/ReplServer.cpp @@ -359,7 +359,7 @@ namespace bool checkGuid(const Guid& guid) { - return (!m_config->sourceGuid.has_value() || m_config->sourceGuid.value() == guid); + return (!m_config->sourceGuid || m_config->sourceGuid.value() == guid); } FB_UINT64 initReplica() @@ -411,12 +411,25 @@ namespace return m_sequence; } + void initGuid(const Guid& guid) + { + if (m_guid != guid) + { + FbLocalStatus localStatus; + m_replicator->init(&localStatus, guid.toString().c_str()); + localStatus.check(); + + m_guid = guid; + } + } + void shutdown() { m_replicator = nullptr; m_attachment = nullptr; m_sequence = 0; m_connected = false; + m_guid.reset(); } void replicate(FB_UINT64 sequence, ULONG offset, ULONG length, const UCHAR* data) @@ -499,6 +512,7 @@ namespace string m_lastError; FB_UINT64 m_errorSequence; ULONG m_errorOffset; + std::optional m_guid; }; typedef Array TargetList; @@ -847,6 +861,8 @@ namespace raiseError("Journal file %s open failed (error: %d)", segment->filename.c_str(), ERRNO); } + target->initGuid(guid); + const TimeStamp startTime(TimeStamp::getCurrentTimeStamp()); AutoFile file(fd); diff --git a/src/remote/server/server.cpp b/src/remote/server/server.cpp index 1f823ba0c27..3b688ae4a6b 100644 --- a/src/remote/server/server.cpp +++ b/src/remote/server/server.cpp @@ -3853,7 +3853,7 @@ void rem_port::batch_sync(PACKET* sendL) } -void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL) +void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL, bool initialize) { LocalStatus ls; CheckStatusWrapper status_vector(&ls); @@ -3872,7 +3872,12 @@ void rem_port::replicate(P_REPLICATE* repl, PACKET* sendL) fb_assert(this->port_replicator); } - if (repl->p_repl_data.cstr_length) + if (initialize) + { + const string guid(repl->p_repl_data.cstr_address, repl->p_repl_data.cstr_length); + this->port_replicator->init(&status_vector, guid.c_str()); + } + else if (repl->p_repl_data.cstr_length) { this->port_replicator->process(&status_vector, repl->p_repl_data.cstr_length, repl->p_repl_data.cstr_address); @@ -5348,7 +5353,8 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p break; case op_repl_data: - port->replicate(&receive->p_replicate, sendL); + case op_repl_init: + port->replicate(&receive->p_replicate, sendL, (op == op_repl_init)); break; ///case op_insert: diff --git a/src/yvalve/YObjects.h b/src/yvalve/YObjects.h index b2a93839ba1..eefda4f46b8 100644 --- a/src/yvalve/YObjects.h +++ b/src/yvalve/YObjects.h @@ -421,6 +421,7 @@ class YReplicator final : void process(Firebird::CheckStatusWrapper* status, unsigned length, const unsigned char* data); void close(Firebird::CheckStatusWrapper* status); void deprecatedClose(Firebird::CheckStatusWrapper* status); + void init(Firebird::CheckStatusWrapper* status, const char* guid); public: AtomicAttPtr attachment; diff --git a/src/yvalve/why.cpp b/src/yvalve/why.cpp index 8625a2c97b8..8024f1c4313 100644 --- a/src/yvalve/why.cpp +++ b/src/yvalve/why.cpp @@ -5244,6 +5244,20 @@ void YReplicator::destroy(unsigned dstrFlags) } +void YReplicator::init(CheckStatusWrapper* status, const char* guid) +{ + try + { + YEntry entry(status, this); + entry.next()->init(status, guid); + } + catch (const Exception& e) + { + e.stuffException(status); + } +} + + void YReplicator::process(CheckStatusWrapper* status, unsigned length, const unsigned char* data) { try