diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py
index 0a4f362e7..48d82b4f7 100644
--- a/hathor/p2p/sync_v2/agent.py
+++ b/hathor/p2p/sync_v2/agent.py
@@ -27,17 +27,21 @@
 from hathor.conf.get_settings import get_settings
 from hathor.p2p.messages import ProtocolMessages
 from hathor.p2p.sync_agent import SyncAgent
+from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient, StreamingError
 from hathor.p2p.sync_v2.mempool import SyncMempoolManager
+from hathor.p2p.sync_v2.payloads import GetBestBlockPayload, GetNextBlocksPayload, GetTransactionsBFSPayload
 from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, BlockchainStreaming, StreamEnd, TransactionsStreaming
+from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient
 from hathor.transaction import BaseTransaction, Block, Transaction
 from hathor.transaction.base_transaction import tx_or_block_from_bytes
 from hathor.transaction.exceptions import HathorError
 from hathor.transaction.storage.exceptions import TransactionDoesNotExist
 from hathor.types import VertexId
-from hathor.util import Reactor, collect_n
+from hathor.util import Reactor, collect_n, not_none
 
 if TYPE_CHECKING:
     from hathor.p2p.protocol import HathorProtocol
+    from hathor.transaction.storage import TransactionStorage
 
 logger = get_logger()
 
@@ -82,7 +86,7 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
         self._settings = get_settings()
         self.protocol = protocol
         self.manager = protocol.node
-        self.tx_storage = protocol.node.tx_storage
+        self.tx_storage: 'TransactionStorage' = protocol.node.tx_storage
         self.state = PeerState.UNKNOWN
 
         self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT
@@ -97,11 +101,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
         # Create logger with context
         self.log = logger.new(peer=self.protocol.get_short_peer_id())
 
-        # Extra
-        self._blk_size = 0
-        self._blk_end_hash = self._settings.GENESIS_BLOCK_HASH
-        self._blk_max_quantity = 0
-
         # indicates whether we're receiving a stream from the peer
         self.receiving_stream = False
 
@@ -117,14 +116,9 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
         self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
         self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None
 
-        # Deferreds used when we are receiving a streaming of vertices.
-        self._deferred_blockchain_streaming: Optional[Deferred[None]] = None
-        self._deferred_transactions_streaming: Optional[Deferred[None]] = None
-
-        # When syncing blocks we start streaming with all peers
-        # so the moment I get some repeated blocks, I stop the download
-        # because it's probably a streaming that I've just received
-        self.max_repeated_blocks = 10
+        # Clients to handle streaming messages.
+        self._blk_streaming_client: Optional[BlockchainStreamingClient] = None
+        self._tx_streaming_client: Optional[TransactionStreamingClient] = None
 
         # Streaming objects
         self.blockchain_streaming: Optional[BlockchainStreaming] = None
@@ -137,11 +131,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
         # Indicate whether the sync manager has been started.
         self._started: bool = False
 
-        # Saves the last received block from the block streaming # this is useful to be used when running the sync of
-        # transactions in the case when I am downloading a side chain. Starts at the genesis, which is common to all
-        # peers on the network
-        self._last_received_block: Optional[Block] = None
-
         # Saves if I am in the middle of a mempool sync
         # we don't execute any sync while in the middle of it
         self.mempool_manager = SyncMempoolManager(self)
@@ -159,9 +148,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
         # Whether we propagate transactions or not
         self._is_relaying = False
 
-        # This stores the final height that we expect the last "get blocks" stream to end on
-        self._blk_end_height: Optional[int] = None
-
         # Whether to sync with this peer
         self._is_enabled: bool = False
 
@@ -328,7 +314,7 @@ def run_sync_transactions(self) -> Generator[Any, Any, None]:
                                   MAX_GET_TRANSACTIONS_BFS_LEN)
 
         # Start with the last received block and find the best block full validated in its chain
-        block = self._last_received_block
+        block = self._blk_streaming_client._last_received_block if self._blk_streaming_client else None
         if block is None:
             block = cast(Block, self.tx_storage.get_genesis(self._settings.GENESIS_BLOCK_HASH))
         else:
@@ -341,7 +327,7 @@ def run_sync_transactions(self) -> Generator[Any, Any, None]:
 
         self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(),
                       end_block_height=block_height)
-        yield self.start_transactions_streaming(needed_txs, block.hash)
+        yield self.start_transactions_streaming(needed_txs, block.hash, block.hash)
 
     def get_my_best_block(self) -> _HeightInfo:
         """Return my best block info."""
@@ -407,10 +393,12 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
                        synced_block=self.synced_block)
 
         # Sync from common block
-        yield self.start_blockchain_streaming(self.synced_block.id,
-                                              self.synced_block.height,
-                                              self.peer_best_block.id,
-                                              self.peer_best_block.height)
+        try:
+            yield self.start_blockchain_streaming(self.synced_block,
+                                                  self.peer_best_block)
+        except StreamingError:
+            self.send_stop_block_streaming()
+            self.receiving_stream = False
 
         return False
 
@@ -495,36 +483,14 @@ def handle_relay(self, payload: str) -> None:
                 self.protocol.send_error_and_close_connection('RELAY: invalid value')
                 return
 
-    def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int,
-                               reverse: bool) -> None:
-        """ Common setup before starting an outgoing block stream.
-        """
-        self._blk_start_hash = start_hash
-        self._blk_start_height = start_height
-        self._blk_end_hash = end_hash
-        self._blk_end_height = end_height
-        self._blk_received = 0
-        self._blk_repeated = 0
-        raw_quantity = end_height - start_height + 1
-        self._blk_max_quantity = -raw_quantity if reverse else raw_quantity
-        self._blk_prev_hash: Optional[bytes] = None
-        self._blk_stream_reverse = reverse
-        self._last_received_block = None
-
     def start_blockchain_streaming(self,
-                                   start_hash: bytes,
-                                   start_height: int,
-                                   end_hash: bytes,
-                                   end_height: int) -> Deferred[None]:
+                                   start_block: _HeightInfo,
+                                   end_block: _HeightInfo) -> Deferred[StreamEnd]:
         """Request peer to start streaming blocks to us."""
-        assert self._deferred_blockchain_streaming is None
-        self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False)
-        quantity = end_height - start_height
-        self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity,
-                      start_hash=start_hash.hex(), end_hash=end_hash.hex())
-        self.send_get_next_blocks(start_hash, end_hash, quantity)
-        self._deferred_blockchain_streaming = Deferred()
-        return self._deferred_blockchain_streaming
+        self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block)
+        quantity = self._blk_streaming_client._blk_max_quantity
+        self.send_get_next_blocks(start_block.id, end_block.id, quantity)
+        return self._blk_streaming_client.run()
 
     def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
         """ Helper to send a message.
@@ -644,12 +610,12 @@ def handle_peer_block_hashes(self, payload: str) -> None:
     def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None:
         """ Send a PEER-BLOCK-HASHES message.
         """
-        payload = json.dumps(dict(
-            start_hash=start_hash.hex(),
-            end_hash=end_hash.hex(),
+        payload = GetNextBlocksPayload(
+            start_hash=start_hash,
+            end_hash=end_hash,
             quantity=quantity,
-        ))
-        self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload)
+        )
+        self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload.json())
         self.receiving_stream = True
 
     def handle_get_next_blocks(self, payload: str) -> None:
@@ -659,11 +625,11 @@ def handle_get_next_blocks(self, payload: str) -> None:
         if self._is_streaming:
             self.protocol.send_error_and_close_connection('GET-NEXT-BLOCKS received before previous one finished')
             return
-        data = json.loads(payload)
+        data = GetNextBlocksPayload.parse_raw(payload)
         self.send_next_blocks(
-            start_hash=bytes.fromhex(data['start_hash']),
-            end_hash=bytes.fromhex(data['end_hash']),
-            quantity=data['quantity'],
+            start_hash=data.start_hash,
+            end_hash=data.end_hash,
+            quantity=data.quantity,
         )
 
     def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None:
@@ -719,7 +685,7 @@ def handle_blocks_end(self, payload: str) -> None:
         This is important to know that the other peer will not send any BLOCKS messages anymore as a response to a
         previous command.
         """
-        self.log.debug('recv BLOCKS-END', payload=payload, size=self._blk_size)
+        self.log.debug('recv BLOCKS-END', payload=payload)
 
         response_code = StreamEnd(int(payload))
         self.receiving_stream = False
@@ -730,10 +696,8 @@ def handle_blocks_end(self, payload: str) -> None:
             self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message')
             return
 
-        assert self._deferred_blockchain_streaming is not None
-        self._deferred_blockchain_streaming.callback(None)
-        self._deferred_blockchain_streaming = None
-
+        assert self._blk_streaming_client is not None
+        self._blk_streaming_client.handle_blocks_end(response_code)
         self.log.debug('block streaming ended', reason=str(response_code))
 
     def handle_blocks(self, payload: str) -> None:
@@ -752,74 +716,10 @@ def handle_blocks(self, payload: str) -> None:
             # Not a block. Punish peer?
             return
         blk.storage = self.tx_storage
-
         assert blk.hash is not None
 
-        self._blk_received += 1
-        if self._blk_received > self._blk_max_quantity + 1:
-            self.log.warn('too many blocks received',
-                          blk_received=self._blk_received,
-                          blk_max_quantity=self._blk_max_quantity,
-                          last_block=blk.hash_hex)
-            # Too many blocks. Punish peer?
-            self.state = PeerState.ERROR
-            return
-
-        if self.partial_vertex_exists(blk.hash):
-            # We reached a block we already have. Skip it.
-            self._blk_prev_hash = blk.hash
-            self._blk_repeated += 1
-            if self.receiving_stream and self._blk_repeated > self.max_repeated_blocks:
-                self.log.debug('repeated block received', total_repeated=self._blk_repeated)
-                self.handle_many_repeated_blocks()
-
-        # basic linearity validation, crucial for correctly predicting the next block's height
-        if self._blk_stream_reverse:
-            if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
-                self.handle_invalid_block('received block is not parent of previous block')
-                return
-        else:
-            if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
-                self.handle_invalid_block('received block is not child of previous block')
-                return
-
-        try:
-            # this methods takes care of checking if the block already exists,
-            # it will take care of doing at least a basic validation
-            # self.log.debug('add new block', block=blk.hash_hex)
-            if self.partial_vertex_exists(blk.hash):
-                # XXX: early terminate?
-                self.log.debug('block early terminate?', blk_id=blk.hash.hex())
-            else:
-                self.log.debug('block received', blk_id=blk.hash.hex())
-            self.on_new_tx(blk, propagate_to_peers=False, quiet=True)
-        except HathorError:
-            self.handle_invalid_block(exc_info=True)
-            return
-        else:
-            self._last_received_block = blk
-            self._blk_repeated = 0
-            # XXX: debugging log, maybe add timing info
-            if self._blk_received % 500 == 0:
-                self.log.debug('block streaming in progress', blocks_received=self._blk_received)
-
-    def handle_invalid_block(self, msg: Optional[str] = None, *, exc_info: bool = False) -> None:
-        """ Call this method when receiving an invalid block.
-        """
-        kwargs: dict[str, Any] = {}
-        if msg is not None:
-            kwargs['error'] = msg
-        if exc_info:
-            kwargs['exc_info'] = True
-        self.log.warn('invalid new block', **kwargs)
-        # Invalid block?!
-        self.state = PeerState.ERROR
-
-    def handle_many_repeated_blocks(self) -> None:
-        """ Call this when a stream sends too many blocks in sequence that we already have.
-        """
-        self.send_stop_block_streaming()
-        self.receiving_stream = False
+        assert self._blk_streaming_client is not None
+        self._blk_streaming_client.handle_blocks(blk)
 
     def send_stop_block_streaming(self) -> None:
         """ Send a STOP-BLOCK-STREAMING message.
@@ -856,43 +756,42 @@ def send_get_best_block(self) -> None:
         """
         self.send_message(ProtocolMessages.GET_BEST_BLOCK)
 
-    def handle_get_best_block(self, payload: str) -> None:
+    def handle_get_best_block(self, _payload: str) -> None:
         """ Handle a GET-BEST-BLOCK message.
         """
         best_block = self.tx_storage.get_best_block()
         meta = best_block.get_metadata()
         assert meta.validation.is_fully_connected()
-        data = {'block': best_block.hash_hex, 'height': meta.height}
-        self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))
+        payload = GetBestBlockPayload(
+            block=not_none(best_block.hash),
+            height=not_none(meta.height),
+        )
+        self.send_message(ProtocolMessages.BEST_BLOCK, payload.json())
 
     def handle_best_block(self, payload: str) -> None:
         """ Handle a BEST-BLOCK message.
         """
-        data = json.loads(payload)
-        _id = bytes.fromhex(data['block'])
-        height = data['height']
-        best_block = _HeightInfo(height=height, id=_id)
+        data = GetBestBlockPayload.parse_raw(payload)
+        best_block = _HeightInfo(height=data.height, id=data.block)
 
         deferred = self._deferred_best_block
         self._deferred_best_block = None
         if deferred:
             deferred.callback(best_block)
 
-    def _setup_tx_streaming(self):
-        """ Common setup before starting an outgoing transaction stream.
-        """
-        self._tx_received = 0
-        self._tx_max_quantity = DEFAULT_STREAMING_LIMIT  # XXX: maybe this is redundant
-        # XXX: what else can we add for checking if everything is going well?
-
-    def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]:
+    def start_transactions_streaming(self,
+                                     start_from: list[bytes],
+                                     first_block_hash: bytes,
+                                     last_block_hash: bytes) -> Deferred[StreamEnd]:
         """Request peer to start streaming transactions to us."""
-        assert self._deferred_transactions_streaming is None
-        self.send_get_transactions_bfs(start_from, until_first_block)
-        self._deferred_transactions_streaming = Deferred()
-        return self._deferred_transactions_streaming
-
-    def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None:
+        self._tx_streaming_client = TransactionStreamingClient(self, start_from, first_block_hash, last_block_hash)
+        self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
+        return self._tx_streaming_client.run()
+
+    def send_get_transactions_bfs(self,
+                                  start_from: list[bytes],
+                                  first_block_hash: bytes,
+                                  last_block_hash: bytes) -> None:
         """ Send a GET-TRANSACTIONS-BFS message.
 
         This will request a BFS of all transactions starting from start_from list and walking back into parents/inputs.
@@ -904,15 +803,19 @@ def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block:
         height of until_first_block. The other peer will return an empty response if it doesn't have any of the
         transactions in start_from or if it doesn't have the until_first_block block.
         """
-        self._setup_tx_streaming()
         start_from_hexlist = [tx.hex() for tx in start_from]
-        until_first_block_hex = until_first_block.hex()
-        self.log.debug('send_get_transactions_bfs', start_from=start_from_hexlist, last_block=until_first_block_hex)
-        payload = json.dumps(dict(
-            start_from=start_from_hexlist,
-            until_first_block=until_first_block_hex,
-        ))
-        self.send_message(ProtocolMessages.GET_TRANSACTIONS_BFS, payload)
+        first_block_hash_hex = first_block_hash.hex()
+        last_block_hash_hex = last_block_hash.hex()
+        self.log.debug('send_get_transactions_bfs',
+                       start_from=start_from_hexlist,
+                       first_block_hash=first_block_hash_hex,
+                       last_block_hash=last_block_hash_hex)
+        payload = GetTransactionsBFSPayload(
+            start_from=start_from,
+            first_block_hash=first_block_hash,
+            last_block_hash=last_block_hash,
+        )
+        self.send_message(ProtocolMessages.GET_TRANSACTIONS_BFS, payload.json())
         self.receiving_stream = True
 
     def handle_get_transactions_bfs(self, payload: str) -> None:
@@ -921,19 +824,18 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
         if self._is_streaming:
             self.log.warn('ignore GET-TRANSACTIONS-BFS, already streaming')
             return
-        data = json.loads(payload)
+        data = GetTransactionsBFSPayload.parse_raw(payload)
         # XXX: todo verify this limit while parsing the payload.
-        start_from = data['start_from']
-        if len(start_from) > MAX_GET_TRANSACTIONS_BFS_LEN:
+        if len(data.start_from) > MAX_GET_TRANSACTIONS_BFS_LEN:
             self.log.error('too many transactions in GET-TRANSACTIONS-BFS', state=self.state)
             self.protocol.send_error_and_close_connection('Too many transactions in GET-TRANSACTIONS-BFS')
             return
-        self.log.debug('handle_get_transactions_bfs', **data)
-        start_from = [bytes.fromhex(tx_hash_hex) for tx_hash_hex in start_from]
-        until_first_block = bytes.fromhex(data['until_first_block'])
-        self.send_transactions_bfs(start_from, until_first_block)
+        self.send_transactions_bfs(data.start_from, data.first_block_hash, data.last_block_hash)
 
-    def send_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None:
+    def send_transactions_bfs(self,
+                              start_from: list[bytes],
+                              first_block_hash: bytes,
+                              last_block_hash: bytes) -> None:
         """ Start a transactions BFS stream.
         """
         start_from_txs = []
@@ -945,14 +847,22 @@ def send_transactions_bfs(self, start_from: list[bytes], until_first_block: byte
                 self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex())
                 self.send_message(ProtocolMessages.NOT_FOUND, start_from_hash.hex())
                 return
-        if not self.tx_storage.transaction_exists(until_first_block):
+        if not self.tx_storage.transaction_exists(first_block_hash):
+            # In case the tx does not exist we send a NOT-FOUND message
+            self.log.debug('requested first_block_hash not found', first_block_hash=first_block_hash.hex())
+            self.send_message(ProtocolMessages.NOT_FOUND, first_block_hash.hex())
+            return
+        if not self.tx_storage.transaction_exists(last_block_hash):
             # In case the tx does not exist we send a NOT-FOUND message
-            self.log.debug('requested until_first_block not found', until_first_block=until_first_block.hex())
-            self.send_message(ProtocolMessages.NOT_FOUND, until_first_block.hex())
+            self.log.debug('requested last_block_hash not found', last_block_hash=last_block_hash.hex())
+            self.send_message(ProtocolMessages.NOT_FOUND, last_block_hash.hex())
             return
         if self.transactions_streaming is not None and self.transactions_streaming.is_running:
             self.transactions_streaming.stop()
-        self.transactions_streaming = TransactionsStreaming(self, start_from_txs, until_first_block,
+        self.transactions_streaming = TransactionsStreaming(self,
+                                                            start_from_txs,
+                                                            first_block_hash,
+                                                            last_block_hash,
                                                             limit=self.DEFAULT_STREAMING_LIMIT)
         self.transactions_streaming.start()
 
@@ -973,7 +883,7 @@ def send_transactions_end(self, response_code: StreamEnd) -> None:
     def handle_transactions_end(self, payload: str) -> None:
         """ Handle a TRANSACTIONS-END message.
         """
-        self.log.debug('recv TRANSACTIONS-END', payload=payload, size=self._blk_size)
+        self.log.debug('recv TRANSACTIONS-END', payload=payload)
 
         response_code = StreamEnd(int(payload))
         self.receiving_stream = False
@@ -984,10 +894,8 @@ def handle_transactions_end(self, payload: str) -> None:
             self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message')
             return
 
-        assert self._deferred_transactions_streaming is not None
-        self._deferred_transactions_streaming.callback(None)
-        self._deferred_transactions_streaming = None
-
+        assert self._tx_streaming_client is not None
+        self._tx_streaming_client.handle_transactions_end(response_code)
         self.log.debug('transaction streaming ended', reason=str(response_code))
 
     def handle_transaction(self, payload: str) -> None:
@@ -1004,33 +912,8 @@ def handle_transaction(self, payload: str) -> None:
             # Not a transaction. Punish peer?
             return
 
-        self._tx_received += 1
-        if self._tx_received > self._tx_max_quantity + 1:
-            self.log.warn('too many txs received')
-            self.state = PeerState.ERROR
-            return
-
-        try:
-            # this methods takes care of checking if the tx already exists, it will take care of doing at least
-            # a basic validation
-            # self.log.debug('add new tx', tx=tx.hash_hex)
-            if self.partial_vertex_exists(tx.hash):
-                # XXX: early terminate?
-                self.log.debug('tx early terminate?', tx_id=tx.hash.hex())
-            else:
-                self.log.debug('tx received', tx_id=tx.hash.hex())
-            self.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True)
-        except HathorError:
-            self.log.warn('invalid new tx', exc_info=True)
-            # Invalid block?!
-            # Invalid transaction?!
-            # Maybe stop syncing and punish peer.
-            self.state = PeerState.ERROR
-            return
-        else:
-            # XXX: debugging log, maybe add timing info
-            if self._tx_received % 100 == 0:
-                self.log.debug('tx streaming in progress', txs_received=self._tx_received)
+        assert self._tx_streaming_client is not None
+        self._tx_streaming_client.handle_transaction(tx)
 
     @inlineCallbacks
     def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]:
diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py
new file mode 100644
index 000000000..bf4b805f6
--- /dev/null
+++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py
@@ -0,0 +1,140 @@
+# Copyright 2023 Hathor Labs
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING, Optional
+
+from structlog import get_logger
+from twisted.internet.defer import Deferred
+
+from hathor.p2p.sync_v2.exception import (
+    BlockNotConnectedToPreviousBlock,
+    InvalidVertexError,
+    StreamingError,
+    TooManyRepeatedVerticesError,
+    TooManyVerticesReceivedError,
+)
+from hathor.p2p.sync_v2.streamers import StreamEnd
+from hathor.transaction import Block
+from hathor.transaction.exceptions import HathorError
+from hathor.types import VertexId
+
+if TYPE_CHECKING:
+    from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
+
+logger = get_logger()
+
+
+class BlockchainStreamingClient:
+    def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
+        self.sync_agent = sync_agent
+        self.protocol = self.sync_agent.protocol
+        self.tx_storage = self.sync_agent.tx_storage
+        self.manager = self.sync_agent.manager
+
+        self.log = logger.new(peer=self.protocol.get_short_peer_id())
+
+        self.start_block = start_block
+        self.end_block = end_block
+
+        # When syncing blocks we start streaming with all peers
+        # so the moment I get some repeated blocks, I stop the download
+        # because it's probably a streaming that I've already received
+        self.max_repeated_blocks = 10
+
+        self._deferred: Deferred[StreamEnd] = Deferred()
+
+        self._blk_received: int = 0
+        self._blk_repeated: int = 0
+
+        self._blk_max_quantity = self.end_block.height - self.start_block.height + 1
+        self._reverse: bool = False
+        if self._blk_max_quantity < 0:
+            self._blk_max_quantity = -self._blk_max_quantity
+            self._reverse = True
+
+        self._last_received_block: Optional[Block] = None
+
+        self._partial_blocks: list[Block] = []
+
+    def run(self) -> Deferred[StreamEnd]:
+        return self._deferred
+
+    def fails(self, reason: 'StreamingError') -> None:
+        self.sync_agent.send_stop_block_streaming()
+        self._deferred.errback(reason)
+
+    def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
+        """ Return true if the vertex exists no matter its validation state.
+        """
+        with self.tx_storage.allow_partially_validated_context():
+            return self.tx_storage.transaction_exists(vertex_id)
+
+    def handle_blocks(self, blk: Block) -> None:
+        if self._deferred.called:
+            return
+
+        self._blk_received += 1
+        if self._blk_received > self._blk_max_quantity:
+            self.log.warn('too many blocks received',
+                          blk_received=self._blk_received,
+                          blk_max_quantity=self._blk_max_quantity)
+            self.fails(TooManyVerticesReceivedError())
+            return
+
+        assert blk.hash is not None
+        is_duplicated = False
+        if self.partial_vertex_exists(blk.hash):
+            # We reached a block we already have. Skip it.
+            self._blk_repeated += 1
+            is_duplicated = True
+            if self._blk_repeated > self.max_repeated_blocks:
+                self.log.debug('too many repeated block received', total_repeated=self._blk_repeated)
+                self.fails(TooManyRepeatedVerticesError())
+
+        # basic linearity validation, crucial for correctly predicting the next block's height
+        if self._reverse:
+            if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash():
+                self.fails(BlockNotConnectedToPreviousBlock())
+                return
+        else:
+            if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash:
+                self.fails(BlockNotConnectedToPreviousBlock())
+                return
+
+        try:
+            # this methods takes care of checking if the block already exists,
+            # it will take care of doing at least a basic validation
+            if is_duplicated:
+                self.log.debug('block early terminate?', blk_id=blk.hash.hex())
+            else:
+                self.log.debug('block received', blk_id=blk.hash.hex())
+            self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True)
+        except HathorError:
+            self.fails(InvalidVertexError())
+            return
+        else:
+            self._last_received_block = blk
+            self._blk_repeated = 0
+            # XXX: debugging log, maybe add timing info
+            if self._blk_received % 500 == 0:
+                self.log.debug('block streaming in progress', blocks_received=self._blk_received)
+
+            meta = blk.get_metadata()
+            if not meta.validation.is_fully_connected():
+                self._partial_blocks.append(blk)
+
+    def handle_blocks_end(self, response_code: StreamEnd) -> None:
+        if self._deferred.called:
+            return
+        self._deferred.callback(response_code)
diff --git a/hathor/p2p/sync_v2/exception.py b/hathor/p2p/sync_v2/exception.py
new file mode 100644
index 000000000..1d9ec3356
--- /dev/null
+++ b/hathor/p2p/sync_v2/exception.py
@@ -0,0 +1,32 @@
+# Copyright 2023 Hathor Labs
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class StreamingError(Exception):
+    pass
+
+
+class TooManyVerticesReceivedError(StreamingError):
+    pass
+
+
+class TooManyRepeatedVerticesError(StreamingError):
+    pass
+
+
+class BlockNotConnectedToPreviousBlock(StreamingError):
+    pass
+
+
+class InvalidVertexError(StreamingError):
+    pass
diff --git a/hathor/p2p/sync_v2/payloads.py b/hathor/p2p/sync_v2/payloads.py
new file mode 100644
index 000000000..ff255860d
--- /dev/null
+++ b/hathor/p2p/sync_v2/payloads.py
@@ -0,0 +1,66 @@
+# Copyright 2023 Hathor Labs
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pydantic import validator
+
+from hathor.utils.pydantic import BaseModel
+
+
+class PayloadBaseModel(BaseModel):
+
+    @classmethod
+    def convert_hex_to_bytes(cls, value: str | bytes) -> bytes:
+        if isinstance(value, str):
+            return bytes.fromhex(value)
+        elif isinstance(value, bytes):
+            return value
+        raise ValueError('invalid type')
+
+    class Config:
+        json_encoders = {
+            bytes: lambda x: x.hex()
+        }
+
+
+class GetNextBlocksPayload(PayloadBaseModel):
+    start_hash: bytes
+    end_hash: bytes
+    quantity: int
+
+    @validator('start_hash', 'end_hash', pre=True)
+    def validate_bytes_fields(cls, value: str | bytes) -> bytes:
+        return cls.convert_hex_to_bytes(value)
+
+
+class GetBestBlockPayload(PayloadBaseModel):
+    block: bytes
+    height: int
+
+    @validator('block', pre=True)
+    def validate_bytes_fields(cls, value: str | bytes) -> bytes:
+        return cls.convert_hex_to_bytes(value)
+
+
+class GetTransactionsBFSPayload(PayloadBaseModel):
+    start_from: list[bytes]
+    first_block_hash: bytes
+    last_block_hash: bytes
+
+    @validator('first_block_hash', 'last_block_hash', pre=True)
+    def validate_bytes_fields(cls, value: str | bytes) -> bytes:
+        return cls.convert_hex_to_bytes(value)
+
+    @validator('start_from', pre=True)
+    def validate_start_from(cls, values: list[str | bytes]) -> list[bytes]:
+        return [cls.convert_hex_to_bytes(x) for x in values]
diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py
index 1c8fac80e..46d508b03 100644
--- a/hathor/p2p/sync_v2/streamers.py
+++ b/hathor/p2p/sync_v2/streamers.py
@@ -190,8 +190,13 @@ class TransactionsStreaming(_StreamingBase):
     """Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
     """
 
-    def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes,
-                 *, limit: int = DEFAULT_STREAMING_LIMIT):
+    def __init__(self,
+                 node_sync: 'NodeBlockSync',
+                 start_from: list[BaseTransaction],
+                 first_block_hash: bytes,
+                 last_block_hash: bytes,
+                 *,
+                 limit: int = DEFAULT_STREAMING_LIMIT) -> None:
         # XXX: is limit needed for tx streaming? Or let's always send all txs for
         # a block? Very unlikely we'll reach this limit
         super().__init__(node_sync, limit=limit)
@@ -199,6 +204,7 @@ def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction]
         assert len(start_from) > 0
         assert start_from[0].storage is not None
         self.storage = start_from[0].storage
+        self.first_block_hash = first_block_hash
         self.last_block_hash = last_block_hash
         self.last_block_height = 0
 
diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py
new file mode 100644
index 000000000..45e2f48b7
--- /dev/null
+++ b/hathor/p2p/sync_v2/transaction_streaming_client.py
@@ -0,0 +1,118 @@
+# Copyright 2023 Hathor Labs
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING
+
+from structlog import get_logger
+from twisted.internet.defer import Deferred
+
+from hathor.p2p.sync_v2.exception import (
+    InvalidVertexError,
+    StreamingError,
+    TooManyRepeatedVerticesError,
+    TooManyVerticesReceivedError,
+)
+from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, StreamEnd
+from hathor.transaction import BaseTransaction
+from hathor.transaction.exceptions import HathorError
+from hathor.types import VertexId
+
+if TYPE_CHECKING:
+    from hathor.p2p.sync_v2.agent import NodeBlockSync
+
+logger = get_logger()
+
+
+class TransactionStreamingClient:
+    def __init__(self,
+                 sync_agent: 'NodeBlockSync',
+                 start_from: list[bytes],
+                 start_block: bytes,
+                 end_block: bytes) -> None:
+        self.sync_agent = sync_agent
+        self.protocol = self.sync_agent.protocol
+        self.tx_storage = self.sync_agent.tx_storage
+        self.manager = self.sync_agent.manager
+
+        self.log = logger.new(peer=self.protocol.get_short_peer_id())
+
+        self.start_from = start_from
+        self.start_block = start_block
+        self.end_block = end_block
+
+        # Let's keep it at "infinity" until a known issue is fixed.
+        self.max_repeated_transactions = 1_000_000
+
+        self._deferred: Deferred[StreamEnd] = Deferred()
+
+        self._tx_received: int = 0
+        self._tx_repeated: int = 0
+
+        self._tx_max_quantity = DEFAULT_STREAMING_LIMIT
+
+    def run(self) -> Deferred[StreamEnd]:
+        return self._deferred
+
+    def fails(self, reason: 'StreamingError') -> None:
+        self.sync_agent.send_stop_block_streaming()
+        self._deferred.errback(reason)
+
+    def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
+        """ Return true if the vertex exists no matter its validation state.
+        """
+        with self.tx_storage.allow_partially_validated_context():
+            return self.tx_storage.transaction_exists(vertex_id)
+
+    def handle_transaction(self, tx: BaseTransaction) -> None:
+        if self._deferred.called:
+            return
+
+        self._tx_received += 1
+        if self._tx_received > self._tx_max_quantity:
+            self.log.warn('too many transactions received',
+                          tx_received=self._tx_received,
+                          tx_max_quantity=self._tx_max_quantity)
+            self.fails(TooManyVerticesReceivedError())
+            return
+
+        assert tx.hash is not None
+        is_duplicated = False
+        if self.partial_vertex_exists(tx.hash):
+            # We reached a block we already have. Skip it.
+            self._tx_repeated += 1
+            is_duplicated = True
+            if self._tx_repeated > self.max_repeated_transactions:
+                self.log.debug('too many repeated transactions received', total_repeated=self._tx_repeated)
+                self.fails(TooManyRepeatedVerticesError())
+
+        try:
+            # this methods takes care of checking if the block already exists,
+            # it will take care of doing at least a basic validation
+            if is_duplicated:
+                self.log.debug('tx early terminate?', tx_id=tx.hash.hex())
+            else:
+                self.log.debug('tx received', tx_id=tx.hash.hex())
+            self.sync_agent.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True)
+        except HathorError:
+            self.fails(InvalidVertexError())
+            return
+        else:
+            # XXX: debugging log, maybe add timing info
+            if self._tx_received % 100 == 0:
+                self.log.debug('tx streaming in progress', txs_received=self._tx_received)
+
+    def handle_transactions_end(self, response_code: StreamEnd) -> None:
+        if self._deferred.called:
+            return
+        self._deferred.callback(response_code)
diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py
index 3aaf098a7..0cf572ec6 100644
--- a/tests/p2p/test_protocol.py
+++ b/tests/p2p/test_protocol.py
@@ -425,7 +425,11 @@ def test_get_data(self):
         self.assertAndStepConn(self.conn, b'^RELAY')
         self.assertIsConnected()
         missing_tx = '00000000228dfcd5dec1c9c6263f6430a5b4316bb9e3decb9441a6414bfd8697'
-        payload = {'until_first_block': missing_tx, 'start_from': [settings.GENESIS_BLOCK_HASH.hex()]}
+        payload = {
+            'first_block_hash': missing_tx,
+            'last_block_hash': missing_tx,
+            'start_from': [settings.GENESIS_BLOCK_HASH.hex()]
+        }
         yield self._send_cmd(self.conn.proto1, 'GET-TRANSACTIONS-BFS', json_dumps(payload))
         self._check_result_only_cmd(self.conn.peek_tr1_value(), b'NOT-FOUND')
         self.conn.run_one_step()