查看原文
其他

crossoverJie 2018-05-24

前言

随着互联网的兴起,现在三高( 高可用、高性能、高并发)项目是越来越流行。

本次来谈谈高并发。首先假设一个业务场景:数据库中有一条数据,需要获取到当前的值,在当前值的基础上 +10,然后再更新回去。 如果此时有两个线程同时并发处理,第一个线程拿到数据是10,+10=20更新回去。第二个线程原本是要在第一个线程的基础上再 +20=40,结果由于并发访问取到更新前的数据为10, +20=30

这就是典型的存在中间状态,导致数据不正确。来看以下的例子:

并发所带来的问题

和上文提到的类似,这里有一张 price表,表结构如下:

  1. CREATE TABLE `price` (

  2.  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',

  3.  `total` decimal(12,2) DEFAULT '0.00' COMMENT '总值',

  4.  `front` decimal(12,2) DEFAULT '0.00' COMMENT '消费前',

  5.  `end` decimal(12,2) DEFAULT '0.00' COMMENT '消费后',

  6.  PRIMARY KEY (`id`)

  7. ) ENGINE=InnoDB AUTO_INCREMENT=1268 DEFAULT CHARSET=utf8

我这里写了一个单测:就一个主线程,循环100次,每次把 front的值减去10,再写入一次流水记录,正常情况是写入的每条记录都会每次减去10。

  1.    /**

  2.     * 单线程消费

  3.     */

  4.    @Test

  5.    public void singleCounsumerTest1(){

  6.        for (int i=0 ;i<100 ;i++){

  7.            Price price = priceMapper.selectByPrimaryKey(1);

  8.            int ron = 10 ;

  9.            price.setFront(price.getFront().subtract(new BigDecimal(ron)));

  10.            price.setEnd(price.getEnd().add(new BigDecimal(ron)));

  11.            price.setTotal(price.getFront().add(price.getEnd()));

  12.            priceMapper.updateByPrimaryKey(price) ;

  13.            price.setId(null);

  14.            priceMapper.insertSelective(price) ;

  15.        }

  16.    }

执行结果如下:

可以看到确实是每次都递减10。

但是如果是多线程的情况下会是如何呢:

我这里新建了一个 PriceController

  1.     /**

  2.     * 线程池 无锁

  3.     * @param redisContentReq

  4.     * @return

  5.     */

  6.    @RequestMapping(value = "/threadPrice",method = RequestMethod.POST)

  7.    @ResponseBody

  8.    public BaseResponse<NULLBody> threadPrice(@RequestBody RedisContentReq redisContentReq){

  9.        BaseResponse<NULLBody> response = new BaseResponse<NULLBody>() ;

  10.        try {

  11.            for (int i=0 ;i<10 ;i++){

  12.                Thread t = new Thread(new Runnable() {

  13.                    @Override

  14.                    public void run() {

  15.                        Price price = priceMapper.selectByPrimaryKey(1);

  16.                        int ron = 10 ;

  17.                        price.setFront(price.getFront().subtract(new BigDecimal(ron)));

  18.                        price.setEnd(price.getEnd().add(new BigDecimal(ron)));

  19.                        priceMapper.updateByPrimaryKey(price) ;

  20.                        price.setId(null);

  21.                        priceMapper.insertSelective(price) ;

  22.                    }

  23.                });

  24.                config.submit(t);

  25.            }

  26.            response.setReqNo(redisContentReq.getReqNo());

  27.            response.setCode(StatusEnum.SUCCESS.getCode());

  28.            response.setMessage(StatusEnum.SUCCESS.getMessage());

  29.        }catch (Exception e){

  30.            logger.error("system error",e);

  31.            response.setReqNo(response.getReqNo());

  32.            response.setCode(StatusEnum.FAIL.getCode());

  33.            response.setMessage(StatusEnum.FAIL.getMessage());

  34.        }

  35.        return response ;

  36.    }

其中为了节省资源使用了一个线程池:

  1. @Component

  2. public class ThreadPoolConfig {

  3.    private static final int MAX_SIZE = 10 ;

  4.    private static final int CORE_SIZE = 5;

  5.    private static final int SECOND = 1000;

  6.    private ThreadPoolExecutor executor ;

  7.    public ThreadPoolConfig(){

  8.        executor = new ThreadPoolExecutor(CORE_SIZE,MAX_SIZE,SECOND, TimeUnit.MICROSECONDS,new LinkedBlockingQueue<Runnable>()) ;

  9.    }

  10.    public void submit(Thread thread){

  11.        executor.submit(thread) ;

  12.    }

  13. }

关于线程池的使用今后会仔细探讨。这里就简单理解为有10个线程并发去处理上面单线程的逻辑,来看看结果怎么样:

会看到明显的数据错误,导致错误的原因自然就是有线程读取到了中间状态进行了错误的更新。

进而有了以下两种解决方案:悲观锁和乐观锁。

悲观锁

简单理解下悲观锁:当一个事务锁定了一些数据之后,只有当当前锁提交了事务,释放了锁,其他事务才能获得锁并执行操作。

使用方式如下: 首先要关闭MySQL的自动提交: setautocommit=0;

  1. bigen --开启事务

  2. select id, total, front, end from price where id=1 for update

  3. insert into price values(?,?,?,?,?)

  4. commit --提交事务

这里使用 selectforupdate的方式利用数据库开启了悲观锁,锁定了id=1的这条数据( 注意:这里除非是使用了索引会启用行级锁,不然是会使用表锁,将整张表都锁住。)。之后使用 commit提交事务并释放锁,这样下一个线程过来拿到的就是正确的数据。

悲观锁一般是用于并发不是很高,并且不允许脏读等情况。但是对数据库资源消耗较大。

乐观锁

那么有没有性能好,支持的并发也更多的方式呢?

那就是乐观锁。

乐观锁是首先假设数据冲突很少,只有在数据提交修改的时候才进行校验,如果冲突了则不会进行更新。

通常的实现方式增加一个 version字段,为每一条数据加上版本。每次更新的时候 version+1,并且更新时候带上版本号。实现方式如下:

新建了一张 price_version表:

  1. CREATE TABLE `price_version` (

  2.  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',

  3.  `total` decimal(12,2) DEFAULT '0.00' COMMENT '总值',

  4.  `front` decimal(12,2) DEFAULT '0.00' COMMENT '消费前',

  5.  `end` decimal(12,2) DEFAULT '0.00' COMMENT '消费后',

  6.  `version` int(11) DEFAULT '0' COMMENT '并发版本控制',

  7.  PRIMARY KEY (`id`)

  8. ) ENGINE=InnoDB AUTO_INCREMENT=1268 DEFAULT CHARSET=utf8

更新数据的SQL:

  1. <update id="updateByVersion" parameterType="com.crossoverJie.pojo.PriceVersion">

  2.    UPDATE price_version

  3.    SET front = #{front,jdbcType=DECIMAL},

  4.        version= version + 1

  5.    WHERE id = #{id,jdbcType=INTEGER}

  6.    AND version = #{version,jdbcType=INTEGER}

  7.  </update>

调用方式:

  1.    /**

  2.     * 线程池,乐观锁

  3.     * @param redisContentReq

  4.     * @return

  5.     */

  6.    @RequestMapping(value = "/threadPriceVersion",method = RequestMethod.POST)

  7.    @ResponseBody

  8.    public BaseResponse<NULLBody> threadPriceVersion(@RequestBody RedisContentReq redisContentReq){

  9.        BaseResponse<NULLBody> response = new BaseResponse<NULLBody>() ;

  10.        try {

  11.            for (int i=0 ;i<3 ;i++){

  12.                Thread t = new Thread(new Runnable() {

  13.                    @Override

  14.                    public void run() {

  15.                        PriceVersion priceVersion = priceVersionMapper.selectByPrimaryKey(1);

  16.                        int ron = new Random().nextInt(20);

  17.                        logger.info("本次消费="+ron);

  18.                        priceVersion.setFront(new BigDecimal(ron));

  19.                        int count = priceVersionMapper.updateByVersion(priceVersion);

  20.                        if (count == 0){

  21.                            logger.error("更新失败");

  22.                        }else {

  23.                            logger.info("更新成功");

  24.                        }

  25.                    }

  26.                });

  27.                config.submit(t);

  28.            }

  29.            response.setReqNo(redisContentReq.getReqNo());

  30.            response.setCode(StatusEnum.SUCCESS.getCode());

  31.            response.setMessage(StatusEnum.SUCCESS.getMessage());

  32.        }catch (Exception e){

  33.            logger.error("system error",e);

  34.            response.setReqNo(response.getReqNo());

  35.            response.setCode(StatusEnum.FAIL.getCode());

  36.            response.setMessage(StatusEnum.FAIL.getMessage());

  37.        }

  38.        return response ;

  39.    }

处理逻辑:开了三个线程生成了20以内的随机数更新到 front字段。

当调用该接口时日志如下:

可以看到线程1、4、5分别生成了15,2,11三个随机数。最后线程4、5都更新失败了,只有线程1更新成功了。

查看数据库:

发现也确实是更新的15。

乐观锁在实际应用相对较多,它可以提供更好的并发访问,并且数据库开销较少,但是有可能存在脏读的情况。

总结

以上两种各有优劣,大家可以根据具体的业务场景来判断具体使用哪种方式来保证数据的一致性。

项目地址:https://github.com/crossoverJie/SSM.git

个人博客地址:http://crossoverjie.top。


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存