Java多线程:objectMonitor源码解读(3)

2023年 7月 14日 119.4k 0

一、Monitor

1. 什么是Monitor

管程Monitor这个概念是操作系统中的,在操作系统层面对管程的定义是:若干的公共变量,以及能对这些公共变量进行操作的过程 组成的资源管理程序。

通俗的话来说:管程是用来管理公共资源的,并提供了对共享资源同步与互斥的操作。

同步实现方式:由于进程的执行顺序是不确定的,管程通过condition变量来让进入管程中的进程无法执行的时候自己阻塞自己。

互斥实现方式:在一个时间点最多只能有一个进程能够执行操作,所以当一个进程试图进入一个巳被占用的管程时它应当在管程的入口处等待,因而在管程的入口处应当有一个进程等待队列,称作入口等待队列。

2.为什么引入管程

引入管程主要是解决信号量同步机制的缺点,进程要自备同步操作,这样同步操作分散在各个进程里面,不易管理,同时也对编写代码提升了难度。

二、objectMonitor源码分析

1 变量

在变量中有几个比较重要的:_cxq_EntryList_cxq是用于存储没有竞争到锁的线程,_EntryList是用来存储等待锁的线程。

ObjectMonitor() {
    // 多线程竞争锁进入时的单向链表
    ObjectWaiter * volatile _cxq;
    //处于等待锁block状态的线程,会被加入到该列表
    ObjectWaiter * volatile _EntryList;
    // _header是一个markOop类型,markOop就是对象头中的Mark Word
    volatile markOop _header;
    // 抢占该锁的线程数,约等于WaitSet.size + EntryList.size
    volatile intptr_t _count;
    // 等待线程数
    volatile intptr_t _waiters;
    // 锁的重入次数
    volatile intptr_ _recursions;
    // 监视器锁寄生的对象,锁是寄托存储于对象中
    void* volatile  _object;
    // 指向持有ObjectMonitor对象的线程
    void* volatile _owner;
    // 处于wait状态的线程,会被加入到_WaitSet
    ObjectWaiter * volatile _WaitSet;
    // 操作WaitSet链表的锁
    volatile int _WaitSetLock;
    // 嵌套加锁次数,最外层锁的_recursions属性为0
    volatile intptr_t  _recursions;
}

2. 执行流程

JavaMonitor时基于管程来实现的,所以线程进入Monitor的流程和进程进入管程类似。
若当前线程执行执行完毕也将释放Monitor并复位变量的值,以便其它线程线程进入Monitor

当线程准备进入Monitor时,首先线程会被加入到_EntryList队列当中,当某个线程进入到Monitor后将ObjectMonitor中的_owner变量设置为当前线程,同时ObjectMonitor中的计数器_count加1。

若持有Monitor的线程调用wait()方法,将释放当前持有的Monitor_owner变量恢复为null_count自减1,同时该线程进入_WaitSet集合中等待被唤醒。

monitor.png

3. 进入Monitor:enter

3.1 代码注释

// 多个线程同时进入此方法
void ATTR ObjectMonitor::enter(TRAPS) {
    // 获取当前线程的指针
    Thread * const Self = THREAD ;
    void * cur ;
    // 通过CAS操作抢占锁,也就是设置_owner的值为当前线程,如果CAS成功返回旧的值,如果失败返回新的值,其中
    // Self是新值,NULL是旧值。CAS机制当中使用了3个基本操作数:
    // CAS(B, V, A): 要设置的新值B,内存地址V,旧的预期值A。设置成功后返回旧的预期值A
    cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
    // 如果cur == NULL则设置成功,说明此时Monitor锁没有被占用
    if (cur == NULL) {     
       return ;
    }
    // 如果cur == Self则设置失败,说明此时占用Monitor的就是当前线程,此时是重入
    if (cur == Self) {
      // 锁的重入次数加1
       _recursions ++ ;
       return ;
    }
    // 轻量级锁膨胀成重量级锁时,将owner设置为lock属性
    if (Self->is_lock_owned ((address)cur)) {
      assert (_recursions == 0, "internal state error");
        // 正常轻量级锁膨胀成重量级锁,之前已经获取轻量级锁的线程不需要二次调用enter方法
        // 此时再调用enter方法说明是锁嵌套情形,将_recursions置为1
      _recursions = 1 ;
      // 将_owner设置为当前线程
      _owner = Self ;
      OwnerIsThread = 1 ;
      return ;
    }

    // 该Monitor被其他线程占用了,需要抢占
    assert (Self->_Stalled == 0, "invariant");
    // 记录需要抢占的Monitor指针
    Self->_Stalled = intptr_t(this) ;
    // Knob_SpinEarly默认为1,即为true
    // TrySpin让当前线程自旋,自旋次数默认可以自适应调整,如果进入安全点同步则退出自旋,返回1表示抢占成功
    if (Knob_SpinEarly && TrySpin (Self) > 0) {
       assert (_owner == Self      , "invariant") ;
       assert (_recursions == 0    , "invariant") ;
       assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
       Self->_Stalled = 0 ;
       return ;
    }

    //自旋若干次数后依然抢占失败
    assert (_owner != Self          , "invariant") ;
    assert (_succ  != Self          , "invariant") ;
    assert (Self->is_Java_thread()  , "invariant") ;
    JavaThread * jt = (JavaThread *) Self ;

    // //校验安全点同步未完成
    assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
    assert (jt->thread_state() != _thread_blocked   , "invariant") ;
    assert (this->object() != NULL  , "invariant") ;
    assert (_count >= 0, "invariant") ;

    //原子的将_count属性加1,表示增加了一个抢占该锁的线程
    Atomic::inc_ptr(&_count);

    EventJavaMonitorEnter event;
    {
        //修改Java线程状态为BLOCKED_ON_MONITOR_ENTER,此代码块退出后还原成原来的
        JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);

        Self->set_current_pending_monitor(this);

        DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
        if (JvmtiExport::should_post_monitor_contended_enter()) {
            JvmtiExport::post_monitor_contended_enter(jt, this);
        }

        //修改OS线程状态为MONITOR_WAIT,此代码块退出后还原成原来的
        OSThreadContendState osts(Self->osthread());
        //让当前线程的调用栈帧可以walkable,即可以被遍历,需要记录上一次执行的Java字节码
        //然后切换线程的运行状态,从_thread_in_vm切换成_thread_blocked,切换的过程如果进入安全点同步则会被阻塞,此代码块退出将状态从_thread_blocked切换成_thread_in_vm,同样切换过程中如果进入安全点同步则被阻塞
        ThreadBlockInVM tbivm(jt);

        // TODO-FIXME: change the following for(;;) loop to straight-line code.
        for (;;) {
            //将线程的_suspend_equivalent属性置为true,该属性表明当前线程处于悬浮状态
            jt->set_suspend_equivalent();

            //会通过自旋,park等方式不断循环尝试获取锁,直到成功获取锁为止 
            EnterI (THREAD) ;

            //ExitSuspendEquivalent默认返回false
            if (!ExitSuspendEquivalent(jt)) break ;

            // 等待suspended当前线程的线程
            _recursions = 0 ;
            _succ = NULL ;
            exit (false, Self) ;

            jt->java_suspend_self();
        }
        //将关联的ObjectMonitor置为null,表示当前线程已经不在阻塞状态了
        Self->set_current_pending_monitor(NULL);
    }
    //原子的将count属性减1,表示已经有一个线程成功获取锁
    Atomic::dec_ptr(&_count);
    assert (_count >= 0, "invariant") ;
    Self->_Stalled = 0 ;

    // Must either set _recursions = 0 or ASSERT _recursions == 0.
    assert (_recursions == 0     , "invariant") ;
    assert (_owner == Self       , "invariant") ;
    assert (_succ  != Self       , "invariant") ;
    assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

    DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
    if (JvmtiExport::should_post_monitor_contended_entered()) {
        JvmtiExport::post_monitor_contended_entered(jt, this);
    }

    if (event.should_commit()) {
        event.set_klass(((oop)this->object())->klass());
        event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
        event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
        event.commit();
    }

    if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
        ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
    }
}

3.2 流程梳理

enter方法用于进入ObjectMonitor,当线程执行enter的时候,先通过CAS去设置_owner为当前线程,然后根据返回的结果进行不同的处理。

1.如果CAS返回值是NULL,说明设置成功了,当前线程成功的进入了ObjectMonitor了,也就是抢占到锁了。

2.如果CAS返回值不为NULL,说明当前有线程已经进入了ObjectMonitor,也就是已经有线程抢占到了锁。

3.判断当前抢占到ObjectMonitor的线程是否就是当前线程,如果是的话锁嵌套次数加1_recursion++,然后退出。

4.如果当前抢占到的ObjectMonitor的线程不是当前线程,就尝试自旋TrySpin获取锁。

5.如果还是没有获取到锁,就进入到EnterI方法,进入EnterI方法之后,还是先尝试自旋TrySping获取锁,如果获取到了就退出。

6.如果最后还是没获取到,就把当前现场封装成ObjectWaiter结构体,然后加入到_cxq队列的头,并且把当前线程挂起。

monitor1.png

4. 退出Monitor:exit

4.1 代码注释


void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
   Thread * Self = THREAD ;
   if (THREAD != _owner) {
     if (THREAD->is_lock_owned((address) _owner)) {
       assert (_recursions == 0, "invariant") ;
       _owner = THREAD ;
       _recursions = 0 ;
       OwnerIsThread = 1 ;
     } else {
       TEVENT (Exit - Throw IMSX) ;
       assert(false, "Non-balanced monitor enter/exit!");
       if (false) {
          THROW(vmSymbols::java_lang_IllegalMonitorStateException());
       }
       return;
     }
   }

   if (_recursions != 0) {
     _recursions--;
     TEVENT (Inflated exit - recursive) ;
     return ;
   }
   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }

   for (;;) {
      assert (THREAD == _owner, "invariant");
      if (Knob_ExitPolicy == 0) {
         OrderAccess::release_store_ptr (&_owner, NULL) ;
         OrderAccess::storeload();
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            TEVENT (Inflated exit - simple egress) ;
            return ;
         }
         TEVENT (Inflated exit - complex egress) ;
         if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
            return ;
         }
         TEVENT (Exit - Reacquired) ;
      } else {
         if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
            OrderAccess::release_store_ptr (&_owner, NULL) ;   // drop the lock
            OrderAccess::storeload() ;
            if (_cxq == NULL || _succ != NULL) {
                TEVENT (Inflated exit - simple egress) ;
                return ;
            }
            if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
               TEVENT (Inflated exit - reacquired succeeded) ;
               return ;
            }
            TEVENT (Inflated exit - reacquired failed) ;
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;
	  // 当QMode等于2并且_cxq不等于空,取_cxq第一个ObjectWaiter对象,并调用ExitEpilog方法,该方法
      // 会唤醒ObjectWaiter对象的线程 
      if (QMode == 2 && _cxq != NULL) {
          w = _cxq ;
          assert (w != NULL, "invariant") ;
          assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
	  // 当QMode等于3,并且_cxq不等于空,把_cxq队列放入到_EntryList的队尾
      if (QMode == 3 && _cxq != NULL) {
          // The following loop is tantamount to: w = swap (&cxq, NULL)
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }
          ObjectWaiter * Tail ;
          for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
          if (Tail == NULL) {
              _EntryList = w ;
          } else {
              Tail->_next = w ;
              w->_prev = Tail ;
          }
      }
	  // 当QMode等于4并且_cxq不等于空,把_cxq队列放入_EntryList的头部
      if (QMode == 4 && _cxq != NULL) {
          w = _cxq ;
          for (;;) {
             assert (w != NULL, "Invariant") ;
             ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
             if (u == w) break ;
             w = u ;
          }
          assert (w != NULL              , "invariant") ;
          ObjectWaiter * q = NULL ;
          ObjectWaiter * p ;
          for (p = w ; p != NULL ; p = p->_next) {
              guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
              p->TState = ObjectWaiter::TS_ENTER ;
              p->_prev = q ;
              q = p ;
          }
          if (_EntryList != NULL) {
              q->_next = _EntryList ;
              _EntryList->_prev = q ;
          }
          _EntryList = w ;
      }

       // 如果_EntryList队列不为空,就从_EntryList中取一个线程唤醒
      w = _EntryList;
      if (w != NULL) {
          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
       // 到这里就是_cxq和_EntryLis都为空,就结束
      w = _cxq ;
      if (w == NULL) continue ;
      // 再次自旋获取 _cxq的首节点
      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
      TEVENT (Inflated exit - drain cxq into EntryList) ;

      assert (w != NULL              , "invariant") ;
      assert (_EntryList  == NULL    , "invariant") ;

      // QMode = 1,把_cxq倒叙插入_EntryList,反之正序插入
      if (QMode == 1) {
         ObjectWaiter * s = NULL ;
         ObjectWaiter * t = w ;
         ObjectWaiter * u = NULL ;
         while (t != NULL) {
             guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
             t->TState = ObjectWaiter::TS_ENTER ;
             u = t->_next ;
             t->_prev = u ;
             t->_next = s ;
             s = t;
             t = u ;
         }
         _EntryList  = s ;
         assert (s != NULL, "invariant") ;
      } else {
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }
      if (_succ != NULL) continue;

     // 最后从_EntryList中取首元素唤醒
      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
}

4.2 流程梳理

exit方法时用于退出ObjectMonitor,在退出的时候也需要唤醒其它等待的线程,根据QMode的值来判断如何唤醒。

1.当QMode=2并且_cxq不为空,取_cxq队列首元素,调用ExitEpilog方法,该方法会唤醒封装成ObjectWaiter对象的线程,此处会立即返回,后面的代码不会执行了。

2.当QMode=3并且_cxq非空,把_cxq队列放入到_EntryList的尾部。

3.当QMode=4并且_cxq非空,把_cxq队列放入到_EntryList的头部。

当上述条件都不满足的时候,就判断_EntryList

4.如果_EntryList的不为空,就取一个线程调用ExitEpilog方法,该方法会唤醒封装成ObjectWaiter对象的线程,然后立即返回。

5.如果_EntryList的为空,就再次判断_cxq是否为空,如果为空的话就自旋继续取。

6.如果_EntryList的为空,_cxq不为空,根据QMode的值,将_cxq插入_EntryList ,如果QMode=1cxq倒叙插入_EntryList 中;如果QMode!=1将cxq插入_EntryList 中。

7.最后再从_EntryList中取出来执行ExitEpilog方法,唤醒线程。

monitor2.png

5. 线程进入等待:wait

5.1 代码注释

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;
   assert(Self->is_Java_thread(), "Must be Java thread!");
   JavaThread *jt = (JavaThread *)THREAD;

   DeferredInitialize () ;
   // 判断当前ObjectMonitor的_owner是不是当前线程
   CHECK_OWNER();

   EventJavaMonitorWait event;
	// 如果线程中断了,且没有未处理的异常
   if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
     if (JvmtiExport::should_post_monitor_waited()) {
        JvmtiExport::post_monitor_waited(jt, this, false);
     }
     if (event.should_commit()) {
       post_monitor_wait_event(&event, 0, millis, false);
     }
     TEVENT (Wait - Throw IEX) ;
     THROW(vmSymbols::java_lang_InterruptedException());
     return ;
   }

   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);

   // 把当前线程封装成ObjectWaiter结构体
   ObjectWaiter node(Self);
   // 并且设置状态为TS_WAIT
   node.TState = ObjectWaiter::TS_WAIT;
   Self->_ParkEvent->reset();
   OrderAccess::fence();          
   // 获取_WaitSetLock锁
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   // 将当前线程加入到Wait队列中
   AddWaiter (&node) ;
   // 释放_WaitSetLock锁
   Thread::SpinRelease (&_WaitSetLock) ;

   if ((SyncFlags & 4) == 0) {
      _Responsible = NULL ;
   }
   intptr_t save = _recursions; 
    // 等待线程数+1
   _waiters++;                  
   _recursions = 0;   
   // 调用exit方法,退出当前ObjectMonitor,让其它线程抢占
   exit (true, Self) ;                    
   guarantee (_owner != Self, "invariant") ;

   int ret = OS_OK ;
   int WasNotified = 0 ;
   { 
     OSThread* osthread = Self->osthread();
     OSThreadWaitState osts(osthread, true);
     {
       ThreadBlockInVM tbivm(jt);
       jt->set_suspend_equivalent();

       if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
       } else
       // 挂起当前线程
       if (node._notified == 0) {
         if (millis _ParkEvent->park () ;
         } else {
            ret = Self->_ParkEvent->park (millis) ;
         }
       }
	   // 如果当前线程从park状态被晃醒了
       if (ExitSuspendEquivalent (jt)) {
          jt->java_suspend_self();
       }
     } 
	// 如果线程中断,或者等待超时 则线程的状态就是TS_WAIT
     if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         // 再次校验线程状态
         if (node.TState == ObjectWaiter::TS_WAIT) {
            // 把线程从WaitSet队列中移除
            DequeueSpecificWaiter (&node) ; 
            assert(node._notified == 0, "invariant");
            // 设置状态为TS_RUN
            node.TState = ObjectWaiter::TS_RUN ;
         }
         Thread::SpinRelease (&_WaitSetLock) ;
     }
  
     guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
     OrderAccess::loadload() ;
     if (_succ == Self) _succ = NULL ;
     WasNotified = node._notified ;
 
     if (JvmtiExport::should_post_monitor_waited()) {
       JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
       if (node._notified != 0 && _succ == Self) {
         node._event->unpark();
       }
     }

     if (event.should_commit()) {
       post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
     }

     OrderAccess::fence() ;

     assert (Self->_Stalled != 0, "invariant") ;
     Self->_Stalled = 0 ;

     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     // 如果线程是TS_RUN 则重新去获取锁
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
         guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
         ReenterI (Self, &node) ;
         node.wait_reenter_end(this);
     }

     guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
     assert    (_owner == Self, "invariant") ;
     assert    (_succ != Self , "invariant") ;
   }

   jt->set_current_waiting_monitor(NULL);

   guarantee (_recursions == 0, "invariant") ;
   _recursions = save;
   _waiters--; 

   assert (_owner == Self       , "invariant") ;
   assert (_succ  != Self       , "invariant") ;
   assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

   if (SyncFlags & 32) {
      OrderAccess::fence() ;
   }

   if (!WasNotified) {
     if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
       TEVENT (Wait - throw IEX from epilog) ;
       THROW(vmSymbols::java_lang_InterruptedException());
     }
   }
}

5.2 流程梳理

objectMonitor::wait方法是用来让线程释放当前锁,并挂起当前线程的。

1.线程线程封装成ObjectWaiter结构体。

2.然后将ObjectWaiter加入到WaitSet队列中。

3.挂起当前线程,并且释放objectMonitor。

4.如果线程被中断了,或者被notify唤醒了,通过enter方法竞争锁。

monitor4.png

6. 唤醒等待线程:notify

6.2 代码注释

void ObjectMonitor::notify(TRAPS) {
  // 检查当前线程的状态
  CHECK_OWNER();
  // 如果等待队列为空,直接返回
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;
  // 获取&_WaitSetLock锁
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  // 从WaitSet中取出第一个元素
  ObjectWaiter * iterator = DequeueWaiter() ;
  if (iterator != NULL) {
     TEVENT (Notify1 - Transfer) ;
     guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
     guarantee (iterator->_notified == 0, "invariant") ;
     if (Policy != 4) {
        // 设置状态为enter
        iterator->TState = ObjectWaiter::TS_ENTER ;
     }
     iterator->_notified = 1 ;
     Thread * Self = THREAD;
     iterator->_notifier_tid = Self->osthread()->thread_id();

     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }
	 // Policy == 0 将 ObjectWaiter放到_EntryList的头部
     // 如果_EntryList 为空 就构建一个
     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     // Policy == 1 将 ObjectWaiter放到_EntryList的尾部
     if (Policy == 1) {     
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
            assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
            Tail->_next = iterator;
            iterator->_prev = Tail;
            iterator->_next = NULL;
        }
     } else
     // Policy == 2
     // 如果_EntryList 为空就放到_EntryList
     // 否则就放到 _cxq 的头
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else
     // Policy == 3
     // 将 ObjectWaiter放到_cxq的尾部
     if (Policy == 3) { 
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
                iterator->_next = NULL ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                   break ;
                }
            } else {
                while (Tail->_next != NULL) Tail = Tail->_next ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
                break ;
            }
        }
     } else {
        // 将等待的线程直接唤醒
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;
        ev->unpark() ;
     }

     // 修改线程状态,记录锁竞争开始
     if (Policy wait_reenter_begin(this);
     }
  }
  // 释放&_WaitSetLock 锁
  Thread::SpinRelease (&_WaitSetLock) ;

  if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
     ObjectMonitor::_sync_Notifications->inc() ;
  }
}

6.3 流程梳理

notify是通过ObjectMonitor::notify实现的,notifyAllObjectMonitor::notify类似,只不过是循环处理。

1.如果当前_WaitSet为空直接返回。

2.如果不为空从_WaitSet中取出第一个线程,根据Policy来决定线程放入到那个队列中。

3.Policy = 0,放入到_EntryList队列的排头。

4.Policy = 1,放入到_EntryList队列的末尾。

5.Policy = 2,放入到_EntryList为空就放到_EntryList,否则就放到_cxq排头位置。

6.Policy = 3,放入到_cxq队列末尾。

monitor3.png

二、Java中的Monitor

1. 如何关联objectMonitor

Java中由于多个线程同时访问公共资源会造成线程安全问题,为了解决线程安全问题,Java提供了同步机制,互斥锁机制保证了同一时刻只有一个线程能访问共享资源,这个机制就是基于MonitorJava中每个对象在实例化的时候都会生成一个Monitor对象与之一一对应。

JVMMonitor机制是基于C++实现的,每一个Monitor都有一个objectMonitor对象。

Java中的synchronized重量级锁、waitnotify都是通过Monitor来实现的。

2.下载openjdk

访问官网openjdk

选择对应的版本,这里是8

monitor5.png

选择具体的版本

monitor6.png

点击下载

monitor7.png

objectMonitor.cpp的位置在openjdk\hotspot\src\share\vm\runtime\objectMonitor.cpp目录下。

3. 相关问题回答

3.1 notify 是随机唤醒线程的吗?

结论:不是

根据objectMonitornotify的源码可知,notify唤醒线程的时候是从等待队列WaitSet中获取第一个线程来唤醒的。

3.2 notify唤醒的线程一定能获取到锁吗?

结论:不是

notify唤醒的线程根据Policy的值,把线程加入到_EntryList或者cxq,调用notify的线程退出的时候执行exit方法,然后根据QMode的值决定从_EntryList或者cxq中取出线程。

3.3 wait和sleep的区别

结论:wait会释放锁,sleep不会

根据wait的源码,线程调用wait的时候会调用exit方法,释放当前objectMonitor

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论