Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProtoTree x->right ==x Caused infinite loop #70

Open
honglei opened this issue Aug 24, 2022 · 41 comments
Open

ProtoTree x->right ==x Caused infinite loop #70

honglei opened this issue Aug 24, 2022 · 41 comments

Comments

@honglei
Copy link
Contributor

honglei commented Aug 24, 2022

Call Stack:

 	Norm.dll!ProtoTree::Bit(const char * key=0x000001db527ca670, unsigned int keysize=96, unsigned int index=64, ProtoTree::Endian keyEndian=ENDIAN_BIG) Line 232	C++
>	Norm.dll!ProtoTree::Remove(ProtoTree::Item & item={...}) Line 459	C++
 	[Inline Frame] Norm.dll!ProtoTreeTemplate<ProtoSortedTree::Item>::Remove(ProtoSortedTree::Item &) Line 354	C++
 	Norm.dll!ProtoSortedTree::Remove(ProtoSortedTree::Item & item={...}) Line 1640	C++
 	Norm.dll!ProtoTimerMgr::RemoveLongTimer(ProtoTimer & theTimer={...}) Line 508	C++
 	Norm.dll!ProtoTimerMgr::OnPulseTimeout(ProtoTimer & __formal) Line 218	C++
 	[Inline Frame] Norm.dll!ProtoTimer::DoTimeout() Line 206	C++
 	Norm.dll!ProtoTimerMgr::OnSystemTimeout() Line 181	C++
 	Norm.dll!ProtoDispatcher::Dispatch() Line 2702	C++
 	Norm.dll!ProtoDispatcher::Run(bool oneShot) Line 1003	C++
 	Norm.dll!ProtoDispatcher::DoThreadStart(void * param=0x000001db52299b18) Line 1061	C++

Code:ProtoTree::Bit

        do                      
        {     
            q = x;              
            if (Bit(key, keysize, x->bit, keyEndian))
                x = x->right; // !!! x == x->right  !!!
            else                
                x = x->left;    
        } while (x != &item);

tree

@honglei
Copy link
Contributor Author

honglei commented Aug 24, 2022

This infinite loop cause call to norm.dll blocked forever:

 	[Inline Frame] Norm.dll!ProtoDispatcher::Lock(_RTL_CRITICAL_SECTION &) Line 306	C++
	Norm.dll!ProtoDispatcher::SuspendThread() Line 1233	C++
 	Norm.dll!NormObjectRetain(const void * objectHandle=0x000001db5b9ce9b0) Line 2726	C++

@honglei honglei changed the title function ProtoTree x->right ==x Caused infinite loop ProtoTree x->right ==x Caused infinite loop Aug 25, 2022
@bebopagogo
Copy link
Collaborator

Hi @honglei - Do you have some test code that produces this condition? Also, the current Protolib code does not have the "x->right == x" you show here so I am a little confused about which version of NORM/Protolib code you are using?

@honglei
Copy link
Contributor Author

honglei commented Aug 28, 2022

I means x->right and x is the same object, ( means &(x->right) == &x), and x != &item , so dead loop occured.
The code is : https://github.com/USNavalResearchLaboratory/protolib/blob/6db401d30dd36ee4ddc777d8c626c88cdcd5a28f/src/common/protoTree.cpp#L424

Test case:
I send 10K 20-25KB files using norm, and and then remove files to another dir when they was succeed sended.
this occured after 2K~6K files sended.

@honglei
Copy link
Contributor Author

honglei commented Aug 28, 2022

create_test_files.py:

import os
dirPath = r'E:\PythonPrj/testFiles'

if not os.path.isdir(dirPath):
    os.mkdir(dirPath)
for i in range(1, 10000):
    with open(os.path.join(dirPath, f"文件{i:04d}.txt"), "w") as f:
        f.write(f"{i} "*5_000)

Env: Win10 x64/VC 2022 x64/Python3.10

@bebopagogo
Copy link
Collaborator

So you are copying the received files to another directory upon NORM_RX_OBJECT_COMPLETED notifications? Are you making any NORM API calls during that time?

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

Yes, The error occured at the NORM sender side,
At the NORM sender-side,I move the sended files( send file dir) to another directory(sended file dir) after NORM_TX_QUEUE_EMPTY or .TX_OBJECT_PURGED.
At the NORM receiver side, I move the received files(temp file name) to another directory(real file name) upon NORM_RX_OBJECT_COMPLETED notifications.

the sender side:

async def proc_sender_event(event:pynorm.event.Event):
    #print(f"{str(event)}")
    obj = event.object
    evtType:pynorm.EventType = event.type
    session:pynorm.session.Session = event.session
    linkID:int = session.conf['linkID']
    userName:str = session.conf['userName']
    
    if obj:   #and obj.handle not in (None,pynorm.NORM_OBJECT_INVALID):
        objType:ObjectType = obj.type
        info:str|None =obj.info.decode() if obj.info else None
        print(event, linkID, obj.type, info)
    else:
        print (event)
    
    if evtType == EventType.TX_OBJECT_SENT:
        if objType == ObjectType.FILE and obj.info:
            session.sendFiles.add(obj.info.decode())
            
    elif evtType == EventType.TX_QUEUE_EMPTY:
        '''
        '''
        for key, obj in session.id2obj.items():
            obj.cancel()
        session.id2obj.clear()
        #用于关闭obj的文件句柄
        del obj
        del event 
        
        #listProcOpenFiles() #用于测试是否有未关闭的文件句柄 
        for file in session.sendFiles:
            try:
                moveFile2SendedDir(file)
            except Exception as e:
                logging.warning( traceback.format_exc() )
        session.sendFiles.clear()
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)
        
    elif evtType == EventType.TX_OBJECT_PURGED:
        '''
            
        '''
        if obj.handle in session.id2obj:
            session.id2obj.pop(obj.handle)
        info = obj.info.decode()
        if info in session.sendFiles:
            session.sendFiles.remove( info )
        obj.cancel()
        del obj
        del event
        try:
            moveFile2SendedDir( info )
        except Exception as e:
            logging.warning( traceback.format_exc() )
            
        
    elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        for file in session.sendFiles:
            moveFile2SendedDir(file)
        session.sendFiles.clear()        
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)
        
    elif event.type == EventType.TX_OBJECT_PURGED:
        
        # 参见 NormSetTxCacheBounds 说明 
        handle:int = obj.handle
        if objType and handle in event.session.id2obj:
            event.session.id2obj.pop(handle) 
            
        if objType == ObjectType.FILE and obj.info:
            '''
                move sended files to the sended dir
            '''
            info:str = obj.info.decode()
            obj.cancel()
            del obj
            del event
            moveFile2SendedDir(info)
            if info in session.sendFiles:
                session.sendFiles.remove(info)
            filePath = await get_one_waitingfile_of_link(settings.sendFileDir, session.conf['linkID'])
            pushFiles2session(session, waitingFiles=[filePath])

the NORM receiver side:

async def proc_recver_file_event(event:pynorm.event.Event):
    '''
         file info is {channelName}/{fileName} encoding: uft-8 
    '''
    #mqttClient:MQTTClient = mqtt_client()
    
    evtType:EventType = event.type
    session:pynorm.session.Session = event.session
    obj:pynorm.object.Object = event.object
    #linkID = session.conf['linkID']
    info:str = obj.info.decode() if obj and obj.info else None
    print (f" {evtType}  {info=}")

    if evtType == EventType.RX_OBJECT_NEW:
        now:str = datetime.datetime.now().isoformat()
        
        channelName, fileName = info.split("/")
        obj.begin = now
        obj.fileName = fileName
        obj.channelName = channelName
        session.id2obj[ info ] =  obj


    elif evtType == EventType.RX_OBJECT_COMPLETED:
        '''
        This event is posted when a receive object is completely received, including available NORM_INFO content. 
        Unless the application specifically retains the "object" handle, the indicated NormObjectHandle becomes invalid and must no longer be referenced.
        '''
        if info in session.id2obj:
            session.id2obj.pop( info )
        #obj.cancel()
        
        newFilePath:str = os.path.join(settings.recvFileDir, obj.channelName, obj.fileName )
        oldPath:str = event.object.filename.decode( locale.getpreferredencoding() )
        print (f"{oldPath=} {newFilePath}")
        try:
            #remove conflict file 
            if os.path.isfile(newFilePath):
                os.remove(newFilePath)
            newDirPath:str = os.path.join(settings.recvFileDir, obj.channelName )
            #create dest dir.
            if not os.path.isdir(newDirPath):
                os.makedirs( newDirPath )           
            #remove temp file to new dir and give the final real name. 
            os.rename(src=oldPath, dst=newFilePath)
        except Exception as e:
            logging.warning ( traceback.format_exc() )  
            
        logging.info(f"{obj.channelName, obj.fileName} completed")
        
    elif evtType == EventType.RX_OBJECT_ABORTED:
        oldPath:str = event.object.filename.decode( locale.getpreferredencoding() )
        obj.cancel()
        os.remove(oldPath) #移除 临时文件 

        logging.warning( f'{obj.channelName, obj.fileName} aborted' )    

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

Are you making any NORM API calls during that time?

Yes, user may add new send files(by calling session.fileEnqueue) while event notification process not complete.

@bebopagogo
Copy link
Collaborator

bebopagogo commented Sep 7, 2022

The TX_QUEUE_EMPTY notification should only be used as a cue for the sender application to enqueue new content for transmission.

The TX_OBJECT_PURGED notification is how the NORM lets the sender application that it is done with the associated file or memory content. NORM itself will delete the object after the notification is issued and the application does not need to (and should not) do that itself. It is an opportunity for the application dispose of (or otherwise manage) the actual file (or memory block) associated with the NormObject, but NORM itself will clean up the NormObject state.

I think the error is occurring because somehow a ProtoTree::Item is being removed from a tree twice (possibly when your code cancels/deletes the obj and then when NORM tries to also do the same) and the Item left/right pointers are not valid. I should probably "harden" the ProtoTree code a little more to avoid the loop issue here ... or at least make sure there is an ASSERT() that will fail instead of an infinite loop.

My recommendation is to remove the calls you make to:

obj.cancel()
del obj
del event

from your handling of those two notifications since the NORM code does that itself under the hood. Also you should only move the sender file during the PURGED notification.

The "norm/examples/python/normMsgr.py" illustrates the handling of TX_QUEUE_EMPTY and TX_OBJECT_PURGED notifications. Even though it is using NORM_OBJECT_DATA (memory blocks) instead of NORM_OBJECT_FILE (files), the handling of those notifications is similar. It simply is deleting the application message buffer upon purge where you want to move the sent files to another directory or something. I do plan to create a "normCast.py" example at some point that is similar to the "normCast.cpp" example. The "normMsgr.cpp" and "normMsgr.py" examples are very similar to an example of file transmission/reception.

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

1 Norm sender, 1 norm receiver, the sender side has never received a TX_OBJECT_PURGED event,
TX_QUEUE_EMPTY or TX_FLUSH_COMPLETED do received.

the events received in the NORM sender side:

GRTT_UPDATED
CC_ACTIVE
TX_RATE_CHANGED
CC_ACTIVE
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0001.txt
GRTT_UPDATED
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0002.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0003.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0004.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0005.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0006.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0007.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0008.txt
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0009.txt
GRTT_UPDATED
TX_OBJECT_SENT 1 ObjectType.FILE a中文频道1/文件0010.txt
TX_QUEUE_EMPTY
GRTT_UPDATED
GRTT_UPDATED
TX_FLUSH_COMPLETED

@bebopagogo
Copy link
Collaborator

A TX_OBJECT_PURGED notification is issued depending upon the sender tx cache parameters which consist of min count, max count, and max size. For example if min_count/max_count is 8, a purge notification for object #1 will be issued when object #9 is enqueued. For NACK-based operation, the NORM sender keeps transmit objects cached for possible response to NACKs ... the cache limit parameters determine when the oldest object is purged from the cache to make room for a newly enqueued object. (and newly enqueued objects are subject to the default timer-based flow control or if the sender app implements flow control using the ACK option)

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

So, I can manually/safely remove all send files when TX_FLUSH_COMPLETED occured?

      elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        for key, obj in session.id2obj.items():
            obj.cancel()
        session.id2obj.clear()
        del obj
        
        for file in session.sendFiles:
            moveFile2SendedDir(file)
        session.sendFiles.clear()   
 
        files = await get_some_waitingfiles_of_link(settings.sendFileDir,linkID)
        pushFiles2session(session, waitingFiles=files)

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

I need to do

del obj
del event

because when TX_OBJECT_PURGED occured(the time I wanna move the file to sended file dir), the related obj file handle is still not closed by NORM:

TX_OBJECT_PURGED 1 ObjectType.FILE a中文频道1/文件0081.txt
11:44:46.996ERROR|deps.py|watch_norm_events|Traceback (most recent call last):
  File "E:\PythonPrj\crud_test\norm\deps.py", line 310, in watch_norm_events
    await proc_event(event)
  File "E:\PythonPrj\crud_test\server\proc_norm.py", line 129, in proc_sender_event
    moveFile2SendedDir(info)
  File "E:\PythonPrj\crud_test\server\proc_norm.py", line 53, in moveFile2SendedDir
    os.rename( currentFilePath, newFilePath )#移动到已发送目录
PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。: '../sendFiles\\a中文频道1/文件0081.txt' -> '../sendedFiles\\a中文频道1\\文件0081.txt'

@honglei
Copy link
Contributor Author

honglei commented Sep 7, 2022

Even the TX_OBJECT_PURGED event process complete, the file handle is still not closed by NORM.
I try to move file after 1 seconds :

    elif event.type == EventType.TX_OBJECT_PURGED:
        # 参见 NormSetTxCacheBounds 说明 
        handle:int = obj.handle
        if objType and handle in event.session.id2obj:
            event.session.id2obj.pop(handle) 
            
        if objType == ObjectType.FILE and obj.info:
            info:str = obj.info.decode()
            #obj.cancel()
            #del obj
            #del event
            #moveFile2SendedDir(info)
            if info in session.sendFiles:
                session.sendFiles.remove(info)
            filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
            pushFiles2session(session, waitingFiles=[filePath])
            loop = asyncio.get_running_loop()
            loop.call_later(1, moveFile2SendedDir, info)  

@bebopagogo
Copy link
Collaborator

OK ... so the event is dispatched to your application before the file is closed? I probably should update the code to close the file handle before the PURGE is issued. By the way, you could also instead use the NormFileRename() function to rename/move the file. I need to double check if that function is part of the Python binding. If not, it should be added. But it's a good idea to have the file handle closed before the PURGE notification in any case, so this is another good find by you. Thanks! I appreciate your rigor and patience.

So I just pushed a commit that adds and calls a NormFileObject::CloseFile() method just before the TX_OBJECT_PURGED notification is issued. This lets your code manipulate the file as desired. For NORM_OBJECT_DATA, the object was left "open" so the application could have the option of accessing and managing the associated memory block (e.g., freeing it if allocated by the application, etc). But for manipulating a file associated with a NORM_OBJECT_FILE, the file handle needs to be closed as you point out.

@honglei
Copy link
Contributor Author

honglei commented Sep 8, 2022

NORM is a wonderful and amazing project! I think it's the best multicast file transfer protocol right now.
uftp or udpcast is not actively maintained.

After changing the waiting time to 30 seconds to do rename after TX_OBJECT_PURGED event, and it works right now:

elif event.type == EventType.TX_OBJECT_PURGED:
    if info in session.info2obj:
        session.info2obj.pop(info)

    # 1, to avoid same file enqueued twice.
    #if info in session.sendFiles:
        #session.sendFiles.remove( info )        
        
    if objType == ObjectType.FILE and obj.info:
        filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
        pushFiles2session(session, waitingFiles=[filePath])
        
        #2, waiting 30seconds for NORM to close file handle. 
        loop = asyncio.get_running_loop()
        loop.call_later(30, moveFile2SendedDir, info) 

@bebopagogo
Copy link
Collaborator

Did you try the new code I commited? I would not expect you to have to wait to move the file since the newly committed code closes the file handle before the PURGE notification is issued. Also, as mentioned, you could use the NormFileRename() call to move/rename the file as it does what is needed (for example, Linux lets you move/rename files while they are open but Windows does not ... although we recently learned if you are using a Samba file system with Linux it is better to close the file while it is being renamed and then re-open it).

In any case, I am curious if the newly committed code works as expected and you should not have to wait to move the filed after the PURGE notification is issued. BTW, have you looked at the examples/normCast.cpp example. It provides some useful file cast features such as iterating through directories, supporting optional positive acknowledgment/flow control, etc. As mentioned my plan is to also make a normCast.py example that has the same features / command-line syntax.

@honglei
Copy link
Contributor Author

honglei commented Sep 8, 2022

the commit b896f54
works with the following code:

if info in session.info2obj:
    session.info2obj.pop(info)
      
if objType == ObjectType.FILE and obj.info:
    moveFile2SendedDir(info)
    
    if info in session.sendFiles:
        session.sendFiles.remove( info )
    
    filePath = await get_one_waitingfile_of_link(settings.sendFileDir, linkID, not_in=session.sendFiles)
    pushFiles2session(session, waitingFiles=[filePath])

So, the issued solved !

@honglei honglei closed this as completed Sep 8, 2022
@honglei
Copy link
Contributor Author

honglei commented Sep 8, 2022

I changed pynorm with type hints added and using enum instead of int variables, this will be commited later.
https://github.com/honglei/fastapi-norm/tree/main/pynorm

As mentioned my plan is to also make a normCast.py example that has the same features / command-line syntax.

I’m willing to do this work.

@honglei honglei reopened this Sep 9, 2022
@honglei
Copy link
Contributor Author

honglei commented Sep 9, 2022

Same problem occured again when I try to send 100K small files!
Env: Win10/1Gbps Ethernet/VC 2022 x64/Python3.10.6
Args: ccEnable=False txRate=5_000_000

    for i in range(1, 100_000):
        with open(os.path.join(dirPath, f"文件{i:05d}.txt"), "w") as f:
            f.write(f"{i} "*5_000)    

@honglei
Copy link
Contributor Author

honglei commented Sep 17, 2022

This is caused by

else
{
PLOG(PL_ERROR, "NormObject::Release() releasing non-retained object?!\n");
}
if (0 == reference_count) delete this;
} // end NormObject::Release()

Python can create two different Objects with same object_id,so when Python Object delete, the following code:

norm/src/pynorm/object.py

Lines 122 to 123 in 40717bb

def __del__(self):
libnorm.NormObjectRelease(self)

make the NormObjectRelease called twice or more, so delete this is called more than once, which caused undefined behavior in C++.

@honglei
Copy link
Contributor Author

honglei commented Sep 17, 2022

like this:

try:
obj = self._objects[self._estruct.object]
except KeyError:
obj = self._objects[self._estruct.object] = Object(self._estruct.object)

and

norm/src/pynorm/session.py

Lines 124 to 125 in 40717bb

def fileEnqueue(self, filename, info=""):
return Object(libnorm.NormFileEnqueue(self, filename.encode('utf-8'), info.encode('utf-8'), len(info)))

Maybe NormFileEnqueue and NormGetNextEvent can keep track of NORM Object's reference_count,
Other Python code may not do such things.

@bebopagogo
Copy link
Collaborator

OK - I think I see the problem. The session.fileEnqueue() creates an Object that is returned, but it is not cached in the instance._objects. dictionary. Later when a sender notification for that object is issued, a second Python Object for that same object handle is created and placed into the instance._objects dictionary. That then leads to the error you have found. I think the correct solution is have the file/data/stream methods that create sender Object add reference of those objects to instance._objects dictionary so duplicative Python objects are not created. Then all the retain/release reference counting will be managed properly.

FYI, I also noticed that the session.fileEnqueue() (and dataEnqueue() and streamOpen()) function does not handle the case for when the underlying NormFileEnqueue() returns NORM_OBJECT_INVALID due to flow control / buffer limits. I think it should return 'None' in this case (instead of throwing the error check exception it does now) so I will also be making that change. I'm not sure how this one slipped through as the normMsgr.py testing should have revealed this, but I may have overlooked testing it in a way that would have revealed this problem.

@bebopagogo
Copy link
Collaborator

bebopagogo commented Sep 17, 2022

After thinking about it more, I am not sure this is the problem. Since NormObjectRetain() is called in the pynorm.Object.init() method and NormObjectRelease() is called in the pynorm.Object.del() method, the underlying NormObject.reference_count should be maintained properly even if multiple Python objects are created that reference the same NormObject. So I think there is something else involved here. I think I need to get your test code and try it myself. It may be related to your event handling somehow.

Note I do think the change mentioned above to cache the Python objects created with fileEnqueue(), etc so the same Python pynorm.Object returned is referenced in the Event handling may be useful. Typically it doesn't matter since the pynorm.Object just references the underlying C++ NormObject and doesn't have any state variables of its own. But if a user application did extend the pynorm.Object, they may want to be able to access the same Python object upon event notifications.

By the way, what is the session.id2obj() in your code? As mentioned, I may need to get a copy of your code that is having the problem for me to be able to diagnose it.

@honglei
Copy link
Contributor Author

honglei commented Sep 27, 2022

The dead loop always occured when ProtoTimerMgr try to remove the report ProtoTimer ( OnReportTimeout)
image

session.id2obj() is used th cache the objects in tx_queue of a session, since no norm api can do this righ now.

@honglei
Copy link
Contributor Author

honglei commented Sep 27, 2022

Norm sender sider, use one instance, two sender session,
link 224.1.2.4 use dir ../sendFiles/a中文频道1 with 10K small files.
link 224.1.2.3 use dir ../sendFiles/频道2 with 100K small files.

import os
if os.name=='nt':
    #os.add_dll_directory(os.path.dirname(__file__))
    os.add_dll_directory(r'E:\PythonPrj\NORM\norm159\makefiles\win32\x64\Release')
import asyncio
import logging
import socket
import traceback
import re

import win32event
import netifaces

def get_interface_by_ip(ip:str):
    '''
       根据IP地址获取对应的接口,
       耗时Win7 0.07秒  Deepin/SW 0.001秒 
    '''
    for iface in netifaces.interfaces():
        allAddrs = netifaces.ifaddresses(iface)        
        if addrs:= allAddrs.get(socket.AF_INET):
            for addr in addrs:
                if addr['addr'] == ip:
                    return iface
        elif addrs:= allAddrs.get(socket.AF_INET6):
            for addr in addrs:
                if addr['addr'] == ip:
                    return iface
import pynorm

def pushFiles2session(session:pynorm.session.Session, waitingFiles:list[str], watermark=False) -> list[str]:
    '''
        try enqueue some files to `session`, return those succeed enqueued ones,
        向Session添加若干待发送文件,返回成功加入到队列的文件 
    '''
    added_files =[]
    for index, filePath in enumerate(waitingFiles):
        channelName, fileName = filePath.split("/")
        fullFilePath:str = os.path.join(settings.sendFileDir, channelName, fileName )
        
        #the session sending queue already has same filePath.  跳过队列中已有的同名文件,
        if filePath in session.info2obj:
            continue 
            #obj = session.info2obj.pop(filePath)
            #obj.cancel()
        obj:pynorm.Object = session.fileEnqueue( fullFilePath , info=filePath.encode() )
        handle:int|None = obj.handle
        if handle in (None, pynorm.NORM_OBJECT_INVALID):
            logging.info(f'pushed files:{added_files}')
            return added_files
        else:
            added_files.append( filePath )
            session.info2obj[filePath] = obj 
        if watermark:
            session.setWatermark(obj)
       
    logging.info(f'pushed files:{added_files}')
    return added_files

def moveFile2SendedDir(info:str) -> bool:
    currentFilePath = os.path.join(settings.sendFileDir, info)
    if not os.path.isfile(currentFilePath):
        return False
        #raise FileNotFoundError(currentFilePath)
    channelName, fileName = info.split('/')
    #1, create dir if not exits, 没有则创建目录
    dirPath = os.path.join(settings.sendedFileDir, channelName )
    if not os.path.exists(dirPath):
        os.makedirs(dirPath)
    newFilePath = os.path.join(dirPath, fileName)
    
    # 移除重名的已下载文件
    if os.path.isfile(newFilePath):
        os.remove(newFilePath)
    
    os.rename( currentFilePath, newFilePath )  #移动到已发送目录    
    logging.info( f"{info} moved!")
    return True

def moveFiles2SendedDir(infos:list[str])-> bool:
    for info in infos:
        moveFile2SendedDir(info)
        

def _get_some_waitingfiles_of_link(basicPath:str, channnelName:str, limit:int=20, not_in:dict|None=None):
    '''
        获取一个频道的待发送文件列表,
        limit: 限制最大数量
    '''
    path:str = os.path.join(basicPath, channnelName)
    #没有创建频道目录,
    if not os.path.isdir(path):
        #os.makedirs(path)
        return []
    
    waiting_files=[]
    dir_or_files = os.listdir(path)
    fileNum =0 
    for fileName in dir_or_files:
        file_path = os.path.join(path,fileName)
        # 临时文件、已完成文件不发送 
        if os.path.isfile(file_path) and not re.findall("\.tmp$|\.end$", fileName): 
            filePath =  f"{channnelName}/{fileName}"
            if not_in is None or filePath not in not_in:   # 未设置或者 文件名称不在待处理的名单中              
                waiting_files.append(filePath)
                fileNum +=1
                if fileNum >=limit:
                    break   
    return waiting_files

from aiofiles.os import wrap

get_some_waitingfiles_of_link = wrap(_get_some_waitingfiles_of_link)



import ipaddress
import typing
from pydantic import BaseSettings
class Settings(BaseSettings):
    sendFileDir:str=""
    sendedFileDir:str =""
    
settings = Settings()

import pynorm    
from pynorm.constants import EventType,NackingMode

SENDER_BUFFER_SPACE = 100*1024*1024

#from util.netifaces_ext import get_interface_by_ip
def create_sender_session(instance:pynorm.Instance, 
                          destAddr:str,
                          destPort:int, 
                          localAddr:str|None=None, 
                          localPort:int=0,                            
                          iface:str|None=None, # the interface of <localAddr>
                          srcAddr:str|None=None, #usef for setSSM
                          sessionIndex:typing.Hashable=None,
                          ccEnable:bool = True,
                          rateMin:float|None=-1,
                          rateMax:float|None=-1,
                          txRate:float|None =None,
                          bufferSpace:int = SENDER_BUFFER_SPACE,
                          segmentSize:int = 1400,
                          blockSize:int = 128,
                          numParity:int = 0,
                          loopbackEnable:bool = False
                        
                          ) -> pynorm.session.Session:

    session = instance.createSession(destAddr, destPort,  localId=ipaddress.IPv4Address(localAddr)._ip, index=sessionIndex  )
    if session._session == pynorm.NORM_SESSION_INVALID:
        raise Exception( f"createSession: NORM_SESSION_INVALID {destAddr}")
    if localAddr or localPort:
        session.setTxPort(localPort,txBindAddr=localAddr) #
    if iface:
        session.setMulticastInterface(iface)
    if srcAddr:
        session.setSSM(srcAddr=srcAddr)    
    session.setTxOnly(txOnly=True) 
    if txRate:
        session.setTxRate(txRate*1000)
    session.setCongestionControl(ccEnable=ccEnable) # 
    if loopbackEnable:
        session.setLoopback(True)
    if rateMin>=0 or rateMax>=0:
        session.setTxRateBounds(rateMin=rateMin*1000,rateMax=rateMax*1000)

    #sessionID = randint(0, 1000)
    success:bool = session.startSender(sessionIndex, bufferSpace, segmentSize=segmentSize, blockSize=blockSize, numParity=numParity ) 
    print (f"startSender:{success}")
    #session.setGroupSize(4)
    return session 
                
                
instance:pynorm.Instance = pynorm.Instance()
async def start_one_link(sendFileDir:str, link:dict, pushFileNum:int=10):
    global instance
    localAddr =link['localAddr'] 
    iface = link.get('iface')
    if os.name =='posix' and iface is None:
        loop = asyncio.get_running_loop()
        iface:str|None  = await loop.run_in_executor(None, get_interface_by_ip, localAddr)
        if not iface:
            raise Exception(f"{localAddr} is not valid local Addr")   
        
    session = create_sender_session(instance=instance,
                          destAddr=link['privateMulticastAddr'], destPort=link['destPort'],
                          localAddr = localAddr ,
                          #localPort:int=0,                            
                          iface= iface, # the interface of <localAddr>
                          #srcAddr:str|None=None, #usef for setSSM
                          sessionIndex = link['linkID'],
                          ccEnable = link['ccEnable'],
                          rateMin =link['rateMin'],
                          rateMax =link['rateMax'],
                          txRate =link['txRate'], 
                          bufferSpace = 100*1024*1024,
                          segmentSize = link['segmentSize'],
                          blockSize = link['blockSize'],
                          numParity = link['numParity'],
                          loopbackEnable=False,
                          )
    session.conf= link
    
    if ttl:=link.get('TTL'):
        session.setTTL(ttl)
    if tos:=link.get('TOS'):
        session.setTOS(tos)
    if autoParity:= link.get("autoParity"):
        session.setAutoParity(autoParity) 
    if waitingFiles:= link.get('waitingFiles'):
        pushFiles2session(session, waitingFiles[:pushFileNum])
        
    session.setTxCacheBounds(50_000_000, 200, 200) #NormSetTxCacheBounds
    files = await  get_some_waitingfiles_of_link(settings.sendFileDir,channnelName=link['channels'][0], )
    pushFiles2session(session, waitingFiles=files)
    

from pynorm.constants import  ObjectType

import copy           
async def proc_sender_event(event:pynorm.Event):
    '''
    
    '''
    #print(f"{str(event)}")
    obj = event.object
    evtType:pynorm.EventType = event.type
    session:pynorm.Session = event.session
    linkID:int = session.conf['linkID']
    
    if obj:
        objType:ObjectType = obj.type
        info:str|None =obj.info.decode() if obj.info else None
        print(event, linkID, obj.type, info)
    else:
        pass
        #print (event)
    
    if evtType == EventType.TX_OBJECT_SENT:
        '''
            需要缓存已经发送的对象,当收到TX_FLUSH_COMPLETED或TX_OBJECT_PURGED或者TX_QUEUE_EMPTY时  
        '''
        pass
        
    elif event.type == EventType.TX_OBJECT_PURGED:

            
        ## 不删除避免同一文件重复添加, to avoid same file enqueued twice.
        if objType == ObjectType.FILE:
            loop=asyncio.get_running_loop()
            #_future:asyncio.Future  = loop.run_in_executor(None, moveFile2SendedDir, info)  
            # 问题: 不能直接启动,在移动完成前从info2obj中移除, 这会导致重复添加文件  
            await loop.run_in_executor(None, moveFile2SendedDir, info) 
            if info in session.info2obj:
                session.info2obj.pop(info)        
                
                
    elif evtType == EventType.TX_QUEUE_EMPTY:
        if len(session.info2obj)>200:
            return 
        files = await  get_some_waitingfiles_of_link(settings.sendFileDir, channnelName=session.conf['channels'][0], limit=10, not_in=session.info2obj )
        if files:
            pushFiles2session(session, waitingFiles=files)
        
    elif evtType == EventType.TX_FLUSH_COMPLETED:
        '''
            the NORM sender observes when it no longer has data ready for transmission has completed
        '''
        try:
            
            files =await get_some_waitingfiles_of_link(settings.sendFileDir,channnelName=session.conf['channels'][0],  not_in=session.info2obj)
            if files:
                pushFiles2session(session, waitingFiles=files)
            else:
                # 当没有新文件需要发送时, 将队列中的文件移出到已发送目录 
                infos =[]
                for info, obj in session.info2obj.items():
                    obj.cancel()
                    infos.append(info)
                session.info2obj.clear()
                loop=asyncio.get_running_loop()
                _future:asyncio.Future  = loop.run_in_executor(None, moveFiles2SendedDir, infos)
        except Exception as e:
            print (traceback.format_exc())
        
import typing
from tornado.ioloop import PeriodicCallback

import pynorm
import select
import time
from pynorm import DebugLevel as NormDebugLevel
import logging
async def watch_norm_events(proc_event: typing.Callable[[pynorm.event.Event],None ], timeout:int=1):
    global instance
    instance.setDebugLevel(level=NormDebugLevel.INFO) #2 Warning 3 INFO 4 DEBUG


    handle = instance.getDescriptor()
    while True:
        try:
            if os.name =='nt':
                '''
                    DWORD WaitForSingleObject(
                     [in] HANDLE hHandle,
                     [in] DWORD  dwMilliseconds
                   );
                   [in] dwMilliseconds

                   The time-out interval, in milliseconds. If a nonzero value is specified, 
                   the function waits until the object is signaled or the interval elapses. 
                   If dwMilliseconds is zero, the function does not enter a wait state if the object is not signaled; 
                   it always returns immediately.
                   If dwMilliseconds is INFINITE, the function will return only when the object is signaled.

                '''
                value:int = await asyncio.to_thread(win32event.WaitForSingleObject, handle,1_000 ) # win32event.INFINITE
                if value == win32event.WAIT_TIMEOUT:
                    await asyncio.sleep(0)
                    continue
                elif value == win32event.WAIT_FAILED:
                    print ( f"error:{win32api.GetLastError()}" )
                elif value == win32event.WAIT_ABANDONED:
                    pass 
                elif value == win32event.WAIT_OBJECT_0:       
                    while event:= instance.getNextEvent( ):
                        beg = time.time()
                        await proc_event(event)
                        print ( f" {event.type.name} {time.time()-beg:.4f}")
                    #print(event) 
            else:
                readable, writable, exceptional = await asyncio.to_thread(select.select, [handle],[],[handle]) 
                if readable:
                    event: pynorm.event.Event = instance.getNextEvent( )
                    await proc_event(event)              
                    
        except Exception as e:
            logging.error ( traceback.format_exc() )
    print("watch_norm_events finished" )
    return 0
        
        
async def periodic_func():
    for session in instance._sessions.values():
        print (f"{session.multiAddr},  {session.getSenderReport().get_dict()}")
   
import tornado 
if __name__ =='__main__':
    logging.getLogger().setLevel(logging.INFO)
    link =  { 'linkID': 1, 'userID': 1, 'linkName': 'a', 'destPort': 6003, 'localAddr': '10.65.39.191',
              'TTL': None, 'TOS': None, 'ccEnable': False,
              'rateMin': -1, 'rateMax': 8000000, 'txRate': 8000000,
              'segmentSize': 1400, 'blockSize': 128, 'numParity': 4, 'autoParity': 1,
              'waitingFiles': [], 'channels': ['a中文频道1'],
              'sendFileDir': '../sendFiles',
              'privateMulticastAddr': '224.1.2.4'
              }

    link2 =  { 'linkID': 1, 'userID': 1, 'linkName': 'a', 'destPort': 6003, 'localAddr': '10.65.39.191',
              'TTL': None, 'TOS': None, 'ccEnable': False,
              'rateMin': -1, 'rateMax': 8000000, 'txRate': 8000000,
              'segmentSize': 1400, 'blockSize': 128, 'numParity': 4, 'autoParity': 1,
              'waitingFiles': [], 'channels': ['频道2'],
              'sendFileDir': '../sendFiles',
              'privateMulticastAddr': '224.1.2.3'
              }
    
    
    
    settings.sendFileDir= '../sendFiles'  # read files from subdir, each subdir is a channel 
    settings.sendedFileDir = '../sendedFiles'
    #loop = asyncio.get_event_loop_policy().new_event_loop() 
    t_loop = tornado.ioloop.IOLoop.current()
    loop = t_loop.asyncio_loop
    
    periodic_check:PeriodicCallback = PeriodicCallback( periodic_func , callback_time=10_000) 
    periodic_check.start()
    
    loop.create_task( watch_norm_events( proc_sender_event) )
    asyncio.run(  start_one_link( settings.sendFileDir, link) )
    asyncio.run(  start_one_link( settings.sendFileDir, link2) )
    loop.run_forever()

@honglei
Copy link
Contributor Author

honglei commented Sep 28, 2022

ProtoTree has two different Report timer(each belong to one session), but with same timeout value:
image
Report1.timeout == Report2.timeout

image

@honglei
Copy link
Contributor Author

honglei commented Sep 28, 2022

Each instance has a single ProtoDispatcher, so two sessions in one instance use the one single ProtoTree, and two report_timers
also added to the same ProtoTree.

void ProtoTree::Remove(ProtoTree::Item& item)
{
    ASSERT(0 != item.GetKeysize());
    if (((&item == item.left) || (&item == item.right)) && (NULL != item.parent))  //1, False !!!! 
    {
        //....
    }
    else
    {
        // Root or "item" with no self-pointers 
        // (a.k.a an "internal entry"?)
        // 1) Find terminal "q"  with backpointer to "item"  
        const char* key = item.GetKey();
        unsigned int keysize = item.GetKeysize();
        Endian keyEndian = item.GetEndian();
        Item* x = &item;
        Item* q;
        do                      
        {     
            q = x;              
            if (Bit(key, keysize, x->bit, keyEndian)) //2, alaways True, two report_timer(s) have same timeout value. 
                x = x->right;
            else                
                x = x->left;    
        } while (x != &item); //3 always True ,since they are two different report_timer( session 224.1.2.3 and session 224.1.2.4).

use address to esscape while-loop, but use value to iterate in the tree, that is the problem.

@bebopagogo
Copy link
Collaborator

bebopagogo commented Sep 28, 2022

The ProtoTimer class uses the ProtoSortedTree for the timer items. The ProtoSortedTree::Insert() and ProtoSortedTree::Remove() methods only allow a single item for a given value to be in the underlying ProtoTree used. If two items have the same value (e.g., two ProtoTimers with the same timeout), only one of them is in the tree while the other is present only in the ordered linked list that is maintained as part of the ProtoSortedTree. So, I don't understand how the condition you describe might occur and I am going to look into it.

Note that ProtoTree is a Patricia Trie implementation and the left/right item pointers can be uplinks to the item itself thus producing the case where "x == &item" and is the condition to escape the while loop above.

@honglei
Copy link
Contributor Author

honglei commented Sep 29, 2022

The ProtoSortedTree.unique_items_only is false by default, so ProtoTimerTable allow two items have the same value.

#include "protoDefs.h"   // for ProtoSystemTime()  
#include "protoTimer.h"

int main(int argc, char* argv[])
{
    ProtoTimerTable table;
    ProtoTimer cc_timer, ack_timer;
    cc_timer.SetInterval(1);
    cc_timer.SetRepeat(1);
    cc_timer.UpdateKey();

    ack_timer.SetInterval(2);
    ack_timer.SetRepeat(2);
    cc_timer.UpdateKey();

    table.Insert(cc_timer);
    table.Insert(ack_timer);

    table.Remove(cc_timer);
    table.Remove(ack_timer);
}

@bebopagogo
Copy link
Collaborator

Does that code produce the problem? It is OK for the ProtoSortedTree to have multiple (but different) items with the same value. For example, when there are two items with the same value, internally only one of the items is inserted into the internal ProtoTree used while both are part of an ordered linked list. If the item that is in the tree is removed, the other one from the linked list takes the first item's place in the the tree structure. I actually wrote some similar test code today that worked just to double check:

`int main(int argc, char* argv[])
{
TestItem a(1.0);
TestItem b(2.0);
TestItem c(1.0);

TestTree tree;

tree.Insert(a);
tree.Insert(b);
tree.Insert(c);

TestTree::Iterator iter(tree);
TestItem* next;
while (NULL != (next = iter.GetNextItem()))
    TRACE("item value: %lf\n", next->GetValue());

TRACE("now removing ...\n");

tree.Remove(b);
tree.Remove(c);
tree.Remove(a);

TRACE("done.\n");

} // end main()`

This code worked without problem. The TestTree and TestItem are simple ProtoSortedTree and ProtoSortedTree::Item subclasses but use a floating point 'double' as the key.

@bebopagogo
Copy link
Collaborator

BTW - While inspecting the ProtoTime::Key code that the ProtoTimerMgr uses for sorting timeout times, I noticed there is a bug for BIG_ENDIAN processors where struct timeval "key" isn't organized properly (byte order swapping is erroneously done on BIG_ENDIAN processors. I have a fix in my local Protolib repo and will push a commit for that. I bring it up here to make sure you are not using a BIG_ENDIAN processor for your tests.

@honglei
Copy link
Contributor Author

honglei commented Sep 29, 2022

image
In my test code, both timer were added to ProtoTree.
keyEndian is ENDIAN_BIG under Win10/VC2022

@bebopagogo
Copy link
Collaborator

bebopagogo commented Sep 29, 2022

Yes the keyEndian ENDIAN_BIG is fine. On LITTLE_ENDIAN machines, the ProtoTime::Key reorders the natively Little Endian struct timeval into Big Endian order since the key is struct with two elements. The bug is that on BIG_ENDIAN CPUs part of the reordering is/was also be erroneously done.

One thing I notice above is the timeout_key of the root->left item is the invalid value of 1000000 microseconds. That is odd, I would expect any ProtoTimer in the ProtoTimerMgr should have a valid timeout_key. I may need to some testing on WIndows. Most of my testing is on Linux or MacOSX systems. But if that debug output is from your test code, it could be a result of that since there is no code that sets the ProtoTimer::timeout member to an actual time of day like ProtoTimerMgr does.

@bebopagogo
Copy link
Collaborator

Note the ProtoTimer constructor does not initialize the ProtoTimer::timeout member so your test code needs to do something to initialize ProtoTimer::timeout before the calls to UpdateKey() are made.

@honglei
Copy link
Contributor Author

honglei commented Sep 29, 2022

I cann't find any initializing code for ack_timer member in Norm code. Only the following

    ack_timer.SetListener(this, &NormSenderNode::OnAckTimeout);
    ack_timer.SetInterval(0.0);
    ack_timer.SetRepeat(0);

@bebopagogo
Copy link
Collaborator

The ProtoTimerMgr::ActivateTimer() is where the the ProtoTimer::timeout member gets set to the current time of day plus the interval value before it is inserted into the ProtoTimerMgr timer_table.

@bebopagogo
Copy link
Collaborator

I have looked into this further and cannot determine exactly what the issue may be. I did just push a commit that caches the enqueued Python objects into the pynorm.instance._objects table which is a WeakValueDIctionary. I am not sure if that will help or not. For each pynorm.object that is created that references an underlying C++ NormObject, a NormObjectRetain() call is made in the pynorm.object.init() procedure and not until that pynorm.object is fully dereferenced / deleted is a NormObjectRelease() call made (in the pynorm.object.del() procedure) so the C++ NormObject::reference_count should be managed correctly even if multiple pynorm.object instances are created for the same C++ NormObject. Only when all references for all pynorm.object instances are gone/deleted would the reference_count get down to zero and be deleted ...

@honglei
Copy link
Contributor Author

honglei commented Oct 8, 2022

It's a Python-side problem or C++ side problem? If the app normCast is workable,(not work right now for issue #71 ), we can check out whether it is cuased by pynorm or not.

@honglei
Copy link
Contributor Author

honglei commented Oct 13, 2022

Another Way:
Any Python call should not affect the reference_count of C++ NormObject, if the Object handle is invalid , just return False or raise an Exception is enough.
Python has copy.copy functions, which will cause wrong reference_count, though my code not use it.
App use libnorm may tell Norm the memory of NormObject should be released by norm or app.

@bebopagogo
Copy link
Collaborator

It's a Python-side problem or C++ side problem? If the app normCast is workable,(not work right now for issue #71 ), we can check out whether it is cuased by pynorm or not.

I will take a look at issue $71. This is a good idea to narrow down the problem.

@bebopagogo
Copy link
Collaborator

Another Way: Any Python call should not affect the reference_count of C++ NormObject, if the Object handle is invalid , just return False or raise an Exception is enough. Python has copy.copy functions, which will cause wrong reference_count, though my code not use it. App use libnorm may tell Norm the memory of NormObject should be released by norm or app.

One could be override the copy() amd deepcopy() methods. The reason maintaining the C++ reference count is important to the Python API is so that the underlying C++ code does not delete the referenced NormObject state as long as there is a Python reference to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants