Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Content.py #107

Open
mfenner1 opened this issue Jul 29, 2011 · 4 comments
Open

Content.py #107

mfenner1 opened this issue Jul 29, 2011 · 4 comments

Comments

@mfenner1
Copy link

Hi folks,

I know I should "git with it" and fork, mod, and make pull requests. I haven't done that and I'm SWAMPED with work. But, I'm trying to get my fixes (or at least, fixes for my needs) back to you. I hit a corner case in Content.py where the last ContentJob wasn't being flushed out to the database. Running cvsanaly2 with -g (debug) showed the query being made, but nothing was showing up in the DB. So, I did some mucking around and (1) rewrote Content.__process_finished_jobs() (based off of Blame.process_finished_jobs()) and (2) tweaked and cleaned up Content.run(). I don't know if I added some other regressions in the process.

Best,
Mark

--- Content.py  2011-07-29 15:58:49.000000000 -0400
+++ /usr/lib/python2.7/site-packages/cvsanaly-2.4-py2.7.egg/pycvsanaly2/extensions/Content.py   2011-07-29 16:43:55.753943196 -0400
@@ -272,38 +272,58 @@

         connection.commit()

-    def __process_finished_jobs(self, job_pool, write_cursor, db):
-#        start = datetime.now()
-        finished_job = job_pool.get_next_done(0)
-        processed_jobs = 0
-        # commit_id is the commit ID. For some reason, the 
-        # documentation advocates tablename_id as the reference,
-        # but in the source, these are referred to as commit IDs.
-        # Don't ask me why!
-        while finished_job is not None:
+    #
+    # MEF 2011-07-29
+    # (seems to be a corner case where the last Content job isn't getting
+    #  flushed out.  trying to fix.) ... It is working for me now.  I did
+    #  two things:
+    #  (1) I rewrote __process_finsihed_jobs and named it
+    #      process_finished_jobs.  I based its structure off of Blame.py
+    #      which seemed to have a cleaner processing model
+    #  (2) I modified the job_pool.join() command in the .run(). See below.
+    #
+    def process_finished_jobs(self, job_pool, write_cursor, unlocked=False):
+        #
+        # Build insertion SQL
+        #
+        insertContentQuery = """INSERT INTO content
+                                (commit_id, file_id, content, loc, size) 
+                                VALUES (?,?,?,?,?)"""
+
+        #
+        # get first job
+        #
+        if unlocked:
+            job = job_pool.get_next_done_unlocked()
+        else:
+            job = job_pool.get_next_done(0.5)
+
+        listOfQueryArgs = []
+        processedJobCt = 0
+        while job is not None:
+            # build contents
             file_contents = None
-                        
             if not Config().no_content:
-                file_contents = str(finished_job.file_contents)
+                file_contents = str(job.file_contents)
+            # build full query args for insertContentQuery
+            thisQueryArgs = (job.commit_id, job.file_id,
+                             file_contents,
+                             job.file_number_of_lines,
+                             job.file_size)
+            listOfQueryArgs.append(thisQueryArgs)
+            processedJobCt += 1
+
+            # get next job
+            if unlocked:
+                job = job_pool.get_next_done_unlocked()
+            else:
+                job = job_pool.get_next_done(0)
+        if listOfQueryArgs:
+            write_cursor.executemany(statement(insertContentQuery,
+                                               self.db.place_holder),
+                                     listOfQueryArgs)
+        return processedJobCt

-            query = """
-                insert into content(commit_id, file_id, content, loc, size) 
-                    values(?,?,?,?,?)"""
-            insert_statement = statement(query, db.place_holder)
-            parameters = (finished_job.commit_id,
-                          finished_job.file_id,
-                          file_contents,
-                          finished_job.file_number_of_lines,
-                          finished_job.file_size)
-                                
-            execute_statement(insert_statement, parameters, write_cursor, db,
-                       "Couldn't insert, duplicate record?", 
-                       exception=ExtensionRunError)
-            
-            processed_jobs += 1
-            finished_job = job_pool.get_next_done(0)
-            
-        return processed_jobs

     def run(self, repo, uri, db):
         # Start the profiler, per every other extension
@@ -345,11 +365,13 @@
             raise ExtensionRunError("Couldn't prepare table because " + \
                                     str(e))

-        queuesize = Config().max_threads
-        printdbg("Setting queuesize to " + str(queuesize))
+        maxJobQueueSize = Config().max_threads
+        printdbg("Setting maxJobQueueSize to " + str(maxJobQueueSize))

         # This is where the threading stuff comes in, I expect
-        job_pool = JobPool(repo, path or repo.get_uri(), queuesize=queuesize)
+        job_pool = JobPool(repo,
+                           path or repo.get_uri(),
+                           queuesize=maxJobQueueSize) #queuesize)

         # This filters files if they're not source files.
         # I'm pretty sure "unknown" is returning binary files too, but
@@ -360,6 +382,10 @@
                 "ft.type in('code') and " + \
                 "f.repository_id = ?"
                 # "ft.type in('code', 'unknown') and " + \
+
+        #
+        # find existing content
+        #
         read_cursor.execute(statement(query, db.place_holder), (repo_id,))
         code_files = [item[0] for item in read_cursor.fetchall()]
         query = """select c.file_id, c.commit_id from content c, files f
@@ -369,17 +395,25 @@
         existing_content = [(item[0], item[1]) \
                             for item in read_cursor.fetchall()]

+        #
+        # Get commit x file x action x composed
+        #
         fr = FileRevs(db, connection, read_cursor, repo_id)

-        i = 0
+        currJobQueueSize = 0 
         # Loop through each file and its revision
         for revision, commit_id, file_id, action_type, composed in fr:
-#            loop_start = datetime.now()
+            #
+            # skip non code files and existing contetn
+            #
             if file_id not in code_files:
                 continue
             if (file_id, commit_id) in existing_content:
                 continue

+            #
+            # compute revision and proper path
+            #
             try:
                 relative_path = fr.get_path()
             except AttributeError, e:
@@ -398,28 +432,52 @@
                 printdbg("Skipping file %s", (relative_path,))
                 continue

+            #
+            # create a content fetching job
+            #
             job = ContentJob(commit_id, file_id, rev, relative_path)
             job_pool.push(job)
-            i = i + 1
-            if i >= queuesize:
-                printdbg("Content queue is now at %d, flushing to database", 
-                         (i,))
+            currJobQueueSize += 1 # i = i+1
+
+            #
+            # many job queue
+            # 
+            if currJobQueueSize >= maxJobQueueSize:
+                printdbg("Content job queue is now at %d. Flushing to database", 
+                         (currJobQueueSize,))

-                processed_jobs = self.__process_finished_jobs(job_pool, 
-                                                              write_cursor, db)
+                numProcessedJobs = self.process_finished_jobs(job_pool,
+                                                              write_cursor)
                 connection.commit()
-                i = i - processed_jobs
-                if processed_jobs < (queuesize / 5):
-                    job_pool.join()
+                currJobQueueSize -= numProcessedJobs
+                job_pool.join()

-        job_pool.join()
-        self.__process_finished_jobs(job_pool, write_cursor, db)
+                #
+                # MEF 2011-07-29
+                # other possible source of fault:
+                # if the num of completed jobs was "too big",
+                # then the current job was never joined in
+                # unless that would happen at the job_pool.join()
+                # outside the loop
+                #

-        profiler_start("Inserting results in db")
-        #self.__insert_many(write_cursor)
+                #if processed_jobs < (queuesize / 5):
+                #    job_pool.join()
+
+        #
+        # process remaining content jobs
+        #
+        job_pool.join()
+        self.process_finished_jobs(job_pool, write_cursor)                
+
+        #
+        # force commit
+        #
         connection.commit()
-        profiler_stop("Inserting results in db")

+        #
+        # clean up connections and cursors
+        #
         read_cursor.close()
         write_cursor.close()
         connection.close()
@cflewis
Copy link

cflewis commented Jul 29, 2011

Hi Mark,
Thanks for your interest in the project, and your attempt to bring fixes back to us. We really appreciate it. However, this is a really large diff, and I feel uncomfortable patching it in, particularly as a I have a few comments here and there.

When you feel more comfortable/have the time to submit a pull request, I will gladly review it for you.

I'll leave this ticket open for now.

Thanks once again!

@linzhp
Copy link
Member

linzhp commented Jul 30, 2011

Mark,

It looks like you misunderstood job_pool.join(). If you look at the implementation, you will find it's waiting for all jobs in the queue to finish, instead of joining current job into the pool (which has been done by job_pool.push(job). Calling job_pool.join() too often will slow down the program, so I tried to avoid calling it unless there is an heuristic evidence that the consumer of the job queue is slower than the producer, indicated by "if processed_jobs < (queuesize / 5)"

However, thanks for adding more comments and making variables more readable.

@mfenner1
Copy link
Author

It looks like you misunderstood job_pool.join().

Definitely, I didn't even look at the implementation. I was just guessing from how I saw it being used. Thank you for giving me the quick summary.

So, my question is: do you see anything in the __process_finished_jobs (the old version) that might have been responsible for missed DB commit behavior I was seeing.

Here's a more complete description of the problem I saw: If there were 11 total content jobs, the first 10 got inserted. When I added another content job -- by adding another commit to the underlying repository -- 11 of 12 content jobs got inserted. The 12th was processed and generated a DB insertion query, but it did not show up in the final DB -- almost as if there was no final connection.commit(). Of course, there was a final connection.commit() [at the end of .run()] both in the old code and the new code so that sort of rules it out as the source of the problem (unless a hidden exception was doing something funky).

Thanks guys and have a good weekend.

@linzhp
Copy link
Member

linzhp commented Jul 30, 2011

I reviewed the old code, and couldn't find anything responsible for the bug. The only change in your code that could change the behavior of the program is the removal of "if processed_jobs < (queuesize / 5)". If that fixed the bug, I would be surprised.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants