import errno import fcntl import logging import os import random import time class CritSect: def __init__(self,max_retries=100): self.current_locks={} self.max_retries=max_retries logging.getLogger().setLevel(logging.DEBUG) def acquire_lock(self,lock_name,s_type='write'): logging.debug('start acquire lock '+lock_name+' type: '+s_type) if self.current_locks.has_key(lock_name): logging.debug('end acquire lock '+lock_name+' type: '+s_type) return True else: lock_file_name = "/tmp/clarens_lock_"+lock_name # read write lock if(s_type!='read'): mode, lock = os.O_RDWR, fcntl.LOCK_EX | fcntl.LOCK_NB else: mode, lock = os.O_RDWR, fcntl.LOCK_SH | fcntl.LOCK_NB # Open lock file try: lock_fd = os.open(lock_file_name, mode|os.O_CREAT) except Exception,e: raise Exception('ERROR','Exception in synchronize open lock file '+str(e.__class__)+':'+str(e)) # Try to obtain lock lockObtained = 0 retries = 0 while retries <= self.max_retries and lockObtained == 0: try: fcntl.flock(lock_fd, lock) except IOError, (err, strerror): if err == errno.EWOULDBLOCK: retries = retries + 1 time.sleep(2) else: raise Exception('ERROR','Exception in acquire_lock open lock file '+str(e.__class__)+':'+str(e)) except Exception,e: raise Exception('ERROR','Exception in acquire_lock open lock file '+str(e.__class__)+':'+str(e)) else: lockObtained = 1 self.current_locks[lock_name]=lock_fd logging.debug('end acquire lock '+lock_name+' type: '+s_type) return True # Raise exception if we could not get the lock if lockObtained == 0: os.close(lock_fd) logging.debug('end acquire lock '+lock_name+' type: '+s_type) return False logging.debug('end acquire lock '+lock_name+' type: '+s_type) def release_lock(self,lock_name): logging.debug('start release lock '+lock_name) if not self.current_locks.has_key(lock_name): return True else: # Unlock lock_fd=self.current_locks[lock_name] try: fcntl.flock(lock_fd, fcntl.LOCK_UN) except Exception,e: raise Exception('ERROR','Exception in release_lock unlock '+str(e.__class__)+':'+str(e)) try: os.close(lock_fd) del self.current_locks[lock_name] except Exception,e: raise Exception('ERROR','Exception in release_lock close lock file '+str(e.__class__)+':'+str(e)) logging.debug('end release lock '+lock_name) def synchronize(self,lock_name,object,funct,args,s_type='write'): try: if self.acquire_lock(lock_name,s_type): # Do the real work try: if(object==None): result=funct(args) else: function=getattr(object,funct) result=function(args) self.release_lock(lock_name) return result except Exception,e: self.release_lock(lock_name) raise Exception('ERROR','Exception in synchronized work '+str(e.__class__)+':'+str(e)) else: raise Exception('ERROR','Failed to lock') except Exception,e: self.release_lock(lock_name) raise # Test (perform this test for several processes at the same time. if __name__=="__main__": def test(args): print('1.PID:'+str(os.getpid())+' and parameters: '+str(args)) time.sleep(2) print('2.PID:'+str(os.getpid())+' and parameters: '+str(args)) time.sleep(2) print('3.PID:'+str(os.getpid())+' and parameters: '+str(args)) time.sleep(2) print('4.PID:'+str(os.getpid())+' and parameters: '+str(args)) time.sleep(2) print('5.PID:'+str(os.getpid())+' and parameters: '+str(args)) time.sleep(2) print('6.PID:'+str(os.getpid())+' and parameters: '+str(args)) raw_input('The test works best if you start 2 or more of this application'+\ 'to test the mutual exclusive nature of the object. Press to continue') mut_ex=CritSect() args=['wrt1','wrt2','wrt3'] print('**************WRITING ONLY******************') for i in [1,2,3,4]: mut_ex.synchronize('test_lock',None,test,args) time.sleep(2) args=['rd1','rd2','rd3'] print('**************READING ONLY******************') for i in [1,2,3,4]: mut_ex.synchronize('test_lock',None,test,args,'read') time.sleep(2) print('**************READ AND WRITE******************') for i in [1,2,3,4]: choice=random.randint(1,15) if (choice<4): args=['wrt1','wrt2','wrt3'] mut_ex.synchronize('test_lock',None,test,args) else: args=['rd1','rd2','rd3'] mut_ex.synchronize('test_lock',None,test,args,'read') time.sleep(2)