java怎么用队列解决高并发访问接口的问题

Jessica丶 发布于 07/04 14:56
阅读 6K+
收藏 15
@RequestMapping(value = "test")
@ResponseBody
public void test(HttpServletRequest request){
   //判断数据库是否有记录
   if(!checkDbExist(request)){
      //模拟数据库插入操作
      dbInsert(request);
   }
}

这个接口可能被高并发访问,所以就会出现如果数据重复发送就会出现数据库重复插入的问题,可以在方法上加synchronized关键字解决这个问题,如果是使用队列该怎么处理高并发的情况

加载中
1
penngo
penngo

1、既然有 checkDbExist(request) 操作,那就是表示你的数据要生成唯一校验值,可以在数据库表中记录这个校验值,并设置为唯一键索引,这是保证数据库表不重复的唯一操作。

2、为了提高性能,你还需要把这个校验值放到redis,直接在redis查询是否存在校验值,可以适应减轻数据库压力。

3、为了再提高性能,你可以把用户提交的数据放到kafka、rocketmq等消息队列,在消费者中完成第2步的操作。

0
dongjunv5
dongjunv5

你需要的是 表单重复提交解决方案吧

Jessica丶
Jessica丶
提交操作是别人系统的
0
yzChen233
yzChen233

开放接口的话,可以加一个请求编号的参数,类似于订单号,重复的订单号请求直接返回错误码

leqixiaozi
leqixiaozi
回复 @Jessica丶 : 如果流程比较长或者操作表太多的话,可以考虑MQ等异步操作
leqixiaozi
leqixiaozi
回复 @Jessica丶 : 加唯一订单号应该可以实现,高并发的时候将唯一订单号放入redis中,如果redis不存在这个订单号就存入,如果存在就直接返回错误码,不要直接和数据库打交道
Jessica丶
Jessica丶
高并发情况下不适用
0
using
using

不就是个防止表单重复提交。

自己看:https://blog.csdn.net/zzp354697171/article/details/79776913

今晚打代码
回复 @Jessica丶 : 对于你来说他们不就等于客户端访问吗?:expressionless:
Jessica丶
Jessica丶
提交操作是别人系统的
0
Kit_lee
Kit_lee

1、如不计较是否用第三方服务的话,可以用redis的RLock锁实现插入不重复

2、可以为数据表加唯一键索引,确保插入的数据不重复。

3、先在内存中比较和存储数据,例如ConcurrentHashMap+synchronized,  然后再实际插入到数据库

0
请叫我七点起床
请叫我七点起床

看你这代码没返回值,所以你可以加个MQ试试。将请求参数封装到MQ中,路由规则可以看你请求参数,然后异步去消费。如果想通过java内存来实现,首先要保证你的服务只部署在单台机器上,然后我写了一个,你可以参考一下,应该会有bug,没细测

public class App{

    // 缓存
    private static final ConcurrentHashMap<String,Future<Long>> cache = new ConcurrentHashMap<>();
    // 缓存有效期
    private static final ConcurrentHashMap<String,String> cachePeriod = new ConcurrentHashMap<>();

    public static void main( String[] args ) throws ExecutionException, InterruptedException {

        for (int i = 0; i< 10;i++){
            Random r = new Random();
            String param = String.valueOf(i);
            App app = new App();
            app.compute(param);
        }

        new Thread(() -> {
            while (cachePeriod.size() > 0){
                for (String key : cachePeriod.keySet()) {
                    Long period = Long.valueOf(cachePeriod.get(key));
                    if (System.currentTimeMillis() / 1000 > period){
                        cachePeriod.remove(key);
                        cache.remove(key);
                    }
                }
            }
        }).start();

        while (cache.size() > 0){
            for (Map.Entry<String, Future<Long>> entry : cache.entrySet()) {
                System.out.println(entry.getKey() + "===" + entry.getValue().get());
                Thread.sleep(500);
            }
            System.out.println("=======================");
        }


    }


    /**
     * 数据库底层保存成功后返回数据记录ID
     * 将参数作为key保存到cache中
     * @param param 请求参数
     * @return Long 数据唯一标识
     */
    public Long compute(final String param){

        while (true){
            // 判断之前是否已处理过
            Future<Long> f = cache.get(param);
            // 未处理过
            if(f == null){
                // 封装处理请求操作
                Callable<Long> eval = () -> {
                    Long id = null;
                    //判断数据库是否有记录
                    if(!checkDbExist(param)){
                        //模拟数据库插入操作
                        id = dbInsert(param);
                    }
                    return id;
                };
                // 保存到缓存中并执行
                FutureTask<Long> ft = new FutureTask<>(eval);
                // 设置缓存有效期10秒
                cachePeriod.put(param,String.valueOf((System.currentTimeMillis() / 1000 + 10)));
                f = cache.putIfAbsent(param,ft);
                if(f == null){
                    f = ft;
                    ft.run();
                }
            }

            // 返回处理结果(数据记录ID)
            // 超时时间设置300毫秒
            try {
                return f.get(300,TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                cache.remove(f,param);
                f.cancel(true);
            } catch (ExecutionException e) {
                cache.remove(f,param);
                f.cancel(true);
            } catch (TimeoutException e) {
                cache.remove(f,param);
                f.cancel(true);
            }

        }
    }

    private Long dbInsert(String param) {
        Random r = new Random();
        return Long.valueOf(r.nextInt(20));
    }

    private boolean checkDbExist(String param) {
        if(Integer.valueOf(param) % 2 == 0){
            return true;
        }
        return false;
    }
}

 

0
tonglingbaoyu
tonglingbaoyu

可以考虑使用guava 限流 

当qps 在某个数值以下时 直接使用上面那份代码

当qps在某个数值以上时 可以进入队列

如果对于数据要求不高,而且并发量不算太高可以直接使用Java 阻塞队列 比如  LinkedTransferQueue

或者 Disruptor的框架中对立

如果还是不行可以考虑第其他队列 比如rabbitmq ,kakfa,redis等等

 

0
咦940
咦940

insert ignore语句了解一下,不过需要一个唯一索引。

0
妹子楼顶有鸽子
妹子楼顶有鸽子

数据库加个唯一性校验

0
Soutv
Soutv

把请求中的参数做成key存在redis中,每次来请求先校验一下是否重复,不重复再进入下一步。key可以设置个过期时间,比如60s这种。

返回顶部
顶部