原创

死磕ReentrantLock同步机制

前言

在开发的过程中,许多并发的场景下,有可能会出现线程不安全的实例,我们可以使用Synchronized与ReentrantLock进行互斥同步的调用,相信大家Synchronized已经很熟悉了。今天主要详细介绍的是J.U.C包下的ReentrantLock,本文主要是对ReentrantLock的加锁与解锁机制进行一个深入解析~

ReentrantLock,顾名思义,是一个可重入的锁,是一种递归无阻塞的同步机制。比较Synchronzed更有公平锁与非公平锁的两种模式。内部主要利用CAS+AQS队列来实现。

在这里插入图片描述

ReentrantLock介绍

首先可以看到ReentrantLock实现了Lock中定义的方法。Lock接口中定义了最基本的加锁,与尝试加锁的方法,会在ReentrantLock做一个实现。

public class ReentrantLock implements Lock, java.io.Serializable

Node 状态

Node节点是对每个等待资源的线程的一个封装,每个节点内部除了封装线程,还有当前节点的状态值。根据节点状态和在队列中的情况,对同步机制做出处理。

CANCELLED(1): 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

SIGNAL(-1): 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点从原来的状态更新为SIGNAL。

CONDITION(-2): 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

0:新结点入队时的默认状态。

ReentrantLock 同步机制

ReentrantLock有两个构造函数,用来设定返回公平锁以及非公平锁。

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
    	//默认的构造函数生成的是非公平锁
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
    	//根据传入的fair值判断是返回公平锁还是非公平锁
        sync = fair ? new FairSync() : new NonfairSync();
    }

在通过公平锁或者是非公平锁生成一个实例对象锁后,后续的同步都会围绕该实例锁进行,如果new了一个新的实例锁,那么将是不一样的阻塞队列和锁的获取。可以将一个实例比对为一个锁。

所以,公平锁或者是非公平锁又是什么呢?先看非公平锁

	//非公平锁是一个集成了Sync的静态内部类
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        //进行加锁的操作
        final void lock() {
        	//在调用lock的时候,不判断等待队列而是直接进行锁的占用,判断是否成功设置锁
            if (compareAndSetState(0, 1))
            	//进行设置获取锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
            	//尝试获取锁,不然则添加到等待队列
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

上面比较关键的是没有直接获取到锁而调用acquire(1);方法

	//调用acquire方法,传入进行改变的锁状态值
    public final void acquire(int arg) {
    	//先尝试获取锁,如果获取锁失败了,则请求等待队列,将线程存入
        if (!tryAcquire(arg) &&
        	//如果没有进行轮到执行,在这一步线程会进行阻塞,直到唤醒执行
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //默认执行的情况下interrupt为false,所以执行完毕就结束了,如果出现了中断,则会返回true,则会调用selfInterrupt();
            // **为线程设置中断标识
            selfInterrupt();
    }

先来看看是如何请求获取锁的tryAcquire(arg)

	//请求获取锁进行请求获取锁
	protected final boolean tryAcquire(int acquires) {
		//引用非公平锁的获取锁
        return nonfairTryAcquire(acquires);
    }
	//调用非公平锁获取锁,传入需要改变的锁变量值
	final boolean nonfairTryAcquire(int acquires) {
		//首先获取到当前的线程
        final Thread current = Thread.currentThread();
        //获取有volatile所标识的当前锁状态值
        int c = getState();
        //如果锁状态值为0,代表没有线程占用锁
        if (c == 0) {
        	//使用cas改变当前的状态值
            if (compareAndSetState(0, acquires)) {
            	//设置锁占有是当前线程
                setExclusiveOwnerThread(current);
                //返回true代表成功设置
                return true;
            }
        }
        //如果锁状态值不为0, 并且当前线程就是占用锁的线程
        else if (current == getExclusiveOwnerThread()) {
        	//将原来的锁状态值加上新的需要添加的值
            int nextc = c + acquires;
            //如果结果小于0,则异常
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //将state的值赋值为新的值
            setState(nextc);
            //返回设置成功
            return true;
        }
        //返回设置失败
        return false;
    }

当锁设置成功后就返回true了,由**!tryAcquire(arg)** 直接跳出结束。如果返回的是false,那么就需要执行后续的逻辑acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。 一点点来看,先看Node.EXCLUSIVE是什么

        /** Marker to indicate a node is waiting in exclusive mode */
        //EXCLUSIVE是Node类的一个静态变量,标记以指示节点正在独占模式下等待。
        static final Node EXCLUSIVE = null;

然后来查看addWaiter(Node.EXCLUSIVE),调用addWaiter方法传入节点状态。

    /**
     * Creates and enqueues node for current thread and given mode.
     * 给当前线程和给定模式创造节点并传入队列
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    //传入的是节点模式
    private Node addWaiter(Node mode) {
    	//创建一个节点粗出当前线程和节点模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //定义pred设置tail, tail为队列的尾节点
        Node pred = tail;
        //如果尾部节点不为空
        if (pred != null) {
        	//设置要存入节点的前置节点为tail
            node.prev = pred;
            //通过cas设置tail的属性,传入原来的tail和node节点
            if (compareAndSetTail(pred, node)) {
            	//设置成功后,将原来的tail的next指向node
                pred.next = node;
                //返回node
                return node;
            }
        }
        //如果原来的tail节点为空,或者cas设置tail失败,进入enq()
        enq(node);
        //结束后返回node
        return node;
    }

	/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    //传入需要传入的节点 
    private Node enq(final Node node) {
    	//设置一个for死循环,内部如果cas失败会重新进入for循环
        for (;;) {
        	//获取到tail节点的值
            Node t = tail;
            //如果tail为null
            if (t == null) { // Must initialize
            	//需要初始化头结点,cas由null变为新的node节点
                if (compareAndSetHead(new Node()))
                	//并将head赋值给tail
                    tail = head; //如果cas失败则重新进入for循环
            } else {
            	//当获取的t不为null的时候,说明tail是有节点的,将node的前置节点设置为t
                node.prev = t;
                //然后通过cas设置将node设置为tail,失败的话就for循环重新设置
                if (compareAndSetTail(t, node)) {
                	//原tail.next = node
                    t.next = node;
                    //返回原来的tail节点
                    return t;
                }
            }
        }
    }

addWaiter() 方法会返回传入的node值,将线程信息摄入队列后,然后传入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    //传入添加进队列的node 和 当前的锁状态值
    final boolean acquireQueued(final Node node, int arg) {
    	//设定failed为true
        boolean failed = true;
        try {
        	//设定是否中断为false
            boolean interrupted = false;
            //进入for死循环
            for (;;) {
            	//获取当前node的前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是head节点, 就去执行tryAcquire(arg),上面有解析过。会对锁状态值进行变换
                if (p == head && tryAcquire(arg)) {
                	//如果成功设置了,代表已经轮到了当前的节点执行
                	//并将head设置为当前的node, 并将内部指定的thread和prev置空
                    setHead(node);
                    //将node的前驱节点的next设置为null,表示已经用不到了,方便gc
                    p.next = null; // help GC
                    //设置failed为false 
                    failed = false;
                    //返回中断的情况,可能是下方改变了interrupted状态。
                    return interrupted;
                }
                //如果前置节点的状态为SIGNAL, 那么会执行parkAndCheckInterrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                	//挂起线程并返回是否被中断, 如果后续线程被唤醒,会继续执行for循环,
                    parkAndCheckInterrupt())
                    //如果被中断则设置中断情况为true
                    interrupted = true;
            }
        } finally {
        	//如果中途出现异常,使得failed为true。
            if (failed)
            	//调用cancelAcquire(node) 详细下面有方法
                cancelAcquire(node);
        }
    }

	/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	//获取前置节点的状态。
        int ws = pred.waitStatus;
        //如果前置节点的状态为SIGNAL,表示等待触发状态,那么当前节点就可以挂起
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             * 
             */
            //返回true。
            return true;
        //如果前置节点状态>0,代表是CANCELLED状态,表示当前线程被取消
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //进入do while循环
            do {
            	//则将pred = pred.prev, 然后让node 的前置节点 = pred
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //让pred的下一个节点设置为node。 这样就跳过了中间为CANCELLED状态的节点。
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //现在waitStatus只能有2种状态,一个是0一个是传播。节点会将前节点转化为SIGNAL状态。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //返回false
        return false;
    }

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
    	//通过LockSupport挂起当前线程,直到被唤醒
        LockSupport.park(this);
        //返回线程的中断状态
        return Thread.interrupted();
    }

    /**
     * Cancels an ongoing attempt to acquire.
     * 取消正在进行的尝试获取
     * @param node the node
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        //如果传入的node为null
        if (node == null)
        	//直接返回
            return;
		//设置节点内的线程标识位null
        node.thread = null;
		
        // Skip cancelled predecessors
        //获取到节点的前驱节点
        Node pred = node.prev;
        //如果前驱节点状态大于0,那就是CANCELLED状态
        while (pred.waitStatus > 0)
        	//将中间所有节点状态为CANCCELLED的节点进行排除。
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        //获取到下一个节点
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        //设置当前节点的状态为CANCELLED,这样别的节点在运行的时候会跳过当前节点。
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        //如果我们就是tail,那么使用cas将tail设置为pred
        if (node == tail && compareAndSetTail(node, pred)) {
        	//并将下一个节点用通过cas设置为null
            compareAndSetNext(pred, predNext, null);
        } else {
        	//不然如果node不为tail/尾部或者cas设置失败。
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            //如果前驱节点不为head
            if (pred != head &&
            	//并且前驱节点的状态为SIGNAL 
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                  //或者状态值为0和传播,然后使用cas设定为SIGNAL成功
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                //并且前置节点的线程不为null
                pred.thread != null) {
                //获取节点的下一个节点
                Node next = node.next;
                //如果下一个节点不为null 并且 下一个节点的状态 <= 0
                if (next != null && next.waitStatus <= 0)
                	//通过cas设置前置节点的下一个节点,由preNext替换为next。
                    compareAndSetNext(pred, predNext, next);
            } else {
            	//唤醒下一个节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }

以上就是非公平锁线程执行任务,进入阻塞队列进行挂起等待直到轮到他被唤醒获取锁的过程,但是该线程在等待的情况下是如何被唤醒的呢? 来看看unlock() 方法

    /**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then {@link IllegalMonitorStateException} is thrown.
     * 如果当前线程持有锁,则进行计数递减,如果计数为0则释放锁。如果当前线程没有持有锁
     * 则会引发异常。
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    //调用解锁 
    public void unlock() {
        //调用sync的release(1)
        sync.release(1);
    }

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     * 
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
    	//尝试释放锁,递减锁状态值
        if (tryRelease(arg)) {
        	//如果锁进行释放了则获取当前的head节点
            Node h = head;
            //如果head不为null并且head的节点状态不为0,即不为在队列中等待获取锁
            if (h != null && h.waitStatus != 0)
            	//执行unparkSuccessor
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
	//尝试解锁
    protected final boolean tryRelease(int releases) {
    	//获取当前锁的状态值 - 需要递减的值
        int c = getState() - releases;
        //判断当前的线程是否是持有锁的线程
        if (Thread.currentThread() != getExclusiveOwnerThread())
        	//如果不是则异常
            throw new IllegalMonitorStateException();
        //设置free判断锁是否被释放
        boolean free = false;
        //如果递减后的锁状态值为0,则代表锁已经被释放
        if (c == 0) {
            free = true;
            //将当然持有锁的变为null
            setExclusiveOwnerThread(null);
        }
        //将修改后的值设置到锁状态值
        setState(c);
        //返回是否解锁
        return free;
   }

    /**
     * Wakes up node's successor, if one exists.
     * 唤醒后继节点,如果它存在
     * 
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        //获取当前的节点状态 
        int ws = node.waitStatus;
        //如果节点状态值小于0
        if (ws < 0)
        	//cas将原本的值转换为0
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //获取到node的下一个节点
        Node s = node.next;
        //如果s为空或者s已经取消
        if (s == null || s.waitStatus > 0) {
        	//设置s为null
            s = null;
            //设置for循环,从后往前遍历,找到最开头的复合条件的node
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                	//将符合条件的t赋值给s
                    s = t;
        }
        //如果s不为null
        if (s != null)
        	//就将s的线程解锁继续执行。
            LockSupport.unpark(s.thread);
    }

唤醒队列的下一个线程后,下一个线程又能去获取锁,然后执行。但是因为是非公平锁,所以可能会被新的线程直接尝试获取锁,从而队列中的线程获取锁失败,继续挂起等待。这是非公平锁,而在公平锁中,则是按队列的顺序优先执行。

来看看公平锁是如何获取锁的 可以看见公平锁是调用acquire(1) 去获取锁,而非公平锁则是直接用cas进行先尝试获取,如果失败了再调用acquire(1) 去获取锁。

		//这是公平锁的获取锁方式
        final void lock() {
            acquire(1);
        }
        //这是非公平锁的获取锁方式
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

总结

本文以自己的理解介绍了ReentrantLock的同步机制,仅仅只是对lock与unlock方法进行解析,如有理解错误的地方请在评论区探讨!

参考:Java并发之AQS详解

java
lock

  • 作者:LinJy(联系作者)
  • 发表时间:2020-09-29 11:59
  • 版权声明:自由转载-非商用-非衍生-保持署名(null)
  • undefined
  • 评论

    留言