服务器
服务器参数 | 最低要求 |
---|---|
CPU | 2 核 |
内存 | 4GB |
带宽 | 5MB |
操作系统 | 推荐 Linux 操作系统,如:Ubuntu 20.04 |
- Docker
- docker-compose
首先pom
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
配置文件
#MQTT配置信息
#MQTT-用户名
spring.mqtt.username=admin
#MQTT-密码
spring.mqtt.password=public
#MQTT-服务器连接地址
spring.mqtt.url=tcp://127.0.0.1:1883
#MQTT-连接服务器默认客户端ID
spring.mqtt.client.id=mqttId
#MQTT-默认的消息推送主题,实际可在调用接口时指定
spring.mqtt.default.topic=topic
package com.pwl.mqtt.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT发送消息配置
*
*/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
//如果设置成true,发送消息时将不会阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
package com.pwl.mqtt.mqtt.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}
controller
package com.pwl.mqtt.mqtt.controller;
import com.pwl.mqtt.mqtt.config.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private MqttGateway mqttGateway;
@GetMapping("/sendMqtt")
public String sendMqtt(String sendData){
mqttGateway.sendToMqtt(sendData,"helloo/123456");
return "OK";
}
}
客户端订阅helloo/123456 地址
访问127.0.0.1:8080/test/sendMqtt?sendData=mqtt 推送消息
mqtt fx 已收到推送的消息(发送消息前要订阅helloo/123456 这个地址)
etc/profile
此模式一般用于 demo 和测试,不用改任何配置,直接敲以下命令执行
sh bin/startup.sh -m standalone
Windows 的话就是
cmd bin/startup.cmd -m standalone
然后从 http://cdh1:8848/nacos/index.html 进入控制台就能看到如下界面了
默认账号和密码为:nacos nacos
cluster 模式需要依赖 MySQL,然后改两个配置文件:
conf/cluster.conf
conf/application.properties
大致如下:
1: cluster.conf,填入要运行 Nacos Server 机器的 ip
192.168.100.155
192.168.100.156
db.num=1
db.url.0=jdbc:mysql://localhost:3306/nacos_config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
db.user=root
db.password=root
创建一个名为nacos_config的 database,将NACOS_PATH/conf/nacos-mysql.sql中的表结构导入刚才创建的库中,这几张表的用途就自己研究吧
step1:在 provider 和 consumer 的 pom 添加以下依赖:
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-alibaba-nacos-discovery
step2:启动类
使用 Spring Cloud 的原生注解 @EnableDiscoveryClient 开启服务注册与发现
package com.crazymaker.cloud.nacos.demo.starter;
import com.crazymaker.springcloud.standard.context.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.List;
@EnableSwagger2
@SpringBootApplication
@EnableDiscoveryClient
@Slf4j
public class ServiceProviderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(ServiceProviderApplication.class, args);
Environment env = applicationContext.getEnvironment();
String port = env.getProperty("server.port");
String path = env.getProperty("server.servlet.context-path");
System.out.println("n--------------------------------------nt" +
"Application is running! Access URLs:nt" +
"Local: tthttp://localhost:" + port + path+ "/index.htmlnt" +
"swagger-ui: thttp://localhost:" + port + path + "/swagger-ui.htmlnt" +
"----------------------------------------------------------");
}
}
step3:服务提供者的 Rest 服务接口
service-provider-demo 提供 一个非常简单的 Rest 服务接口以供访问
package com.crazymaker.cloud.nacos.demo.controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/echo")
public class EchoController {
//回显服务
@RequestMapping(value = "/{string}", method = RequestMethod.GET)
public String echo(@PathVariable String string) {
return "echo: " + string;
}
}
step4:配置文件
spring:
application:
name: service-provider-demo
cloud:
nacos:
discovery:
server-addr: ${NACOS_SERVER:cdh1:8848}
server:
port: 18080
云服务,基础设施即服务IaaS,平台即服务PaaS,软件即服务SaaS
-
软件即服务(SaaS) :这是一个完整的软件应用程序,具有用户界面;
-
平台即服务(PaaS) :开发人员可以在其中部署自己的应用程序的平台;
-
基础设施即服务(IaaS) :提供机器、存储和网络资源,开发人员可以通过安装自己的操作系统、应用程序和支持资源来管理。
没有中台的时代
在传统IT企业,项目的物理结构是什么样的呢?无论项目内部的如何复杂,都可分为“前台”和“后台”这两部分。
什么是前台?
首先,这里所说的“前台”和“前端”并不是一回事。所谓前台即包括各种和用户直接交互的界面,比如web页面,手机app;也包括服务端各种实时响应用户请求的业务逻辑,比如商品查询、订单系统等等。
什么是后台?
后台并不直接面向用户,而是面向运营人员的配置管理系统,比如商品管理、物流管理、结算管理。后台为前台提供了一些简单的配置。
业务中台
业务中台在前文中反复提及,就是把各个项目的共通业务进行下沉,整合成通用的服务平台:
技术中台
技术平台,为了避免研发人员重复发明轮子,向各个项目提供通用的底层框架、引擎、中间件:
数据中台
数据中台,为各个项目进行各种数据采集和分析:
**算法中台 **
算法中台,为各个项目提供算法能力,比如推荐算法、搜索算法、图像识别、语音识别等等:
分布式(distributed)是指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。
集群(cluster)是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。
什么是并发和并行
并发和并行最开始都是操作系统中的概念,表示的是CPU执行多个任务的方式。
并发是指在一段时间内宏观上多个程序同时运行。并行指的是同一个时刻,多个任务确实真的在同时运行。
并发和并行的区别
并发,指的是多个事情,在同一时间段内同时发生了。
并行,指的是多个事情,在同一时间点上同时发生了。
并发的多个任务之间是互相抢占资源的。
并行的多个任务之间是不互相抢占资源的、
只有在多CPU的情况中,才会发生并行。否则,看似同时发生的事情,其实都是并发执行的。
Nginx
Nginx(发音同engine x)是一个网页服务器,它能反向代理HTTP, HTTPS, SMTP, POP3, IMAP的协议链接,以及一个负载均衡器和一个HTTP缓存。
Nginx主要用来做七层负载均衡。
什么是负载均衡
前面提到过了,为了提升网站的各方面能力,我们一般会把多台机器组成一个集群对外提供服务。然而,我们的网站对外提供的访问入口都是一个的,比如www.taobao.com。那么当用户在浏览器输入www.taobao.com的时候如何将用户的请求分发到集群中不同的机器上呢,这就是负载均衡在做的事情。
负载均衡(Load Balance),意思是将负载(工作任务,访问请求)进行平衡、分摊到多个操作单元(服务器,组件)上进行执行。是解决高性能,单点故障(高可用),扩展性(水平伸缩)的终极解决方案。
RPC 是Remote Procedure Call的缩写,译为远程过程调用。是一个计算机通信协议。
这种手段就叫做并发控制。并发控制的目的是保证一个用户的工作不会对另一个用户的工作产生不合理的影响。
没有做好并发控制,就可能导致[脏读、幻读和不可重复读]等问题。
我们常说的并发控制,一般都和数据库管理系统(DBMS)有关,在DBMS中的并发控制的任务是确保在多个事务同时存取数据库中同一数据时不破坏事务的隔离性和统一性以及数据库的统一性。
实现并发控制的主要手段大致可以分为乐观并发控制和悲观并发控制两种。
无论是悲观锁还是乐观锁,都是人们定义出来的概念,可以认为是一种思想。 其实不仅仅是关系型数据库系统中有乐观锁和悲观锁的概念,像memcache、hibernate、tair等都有类似的概念。所以,不应该拿乐观锁、悲观锁和其他的数据库锁等进行对比。
悲观锁
当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。
这种借助数据库锁机制在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)。
之所以叫做悲观锁,是因为这是一种对数据的修改抱有悲观态度的并发控制方式。我们一般认为数据被并发修改的概率比较大,所以需要在修改之前先加锁。
悲观并发控制实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。
但是在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会; 另外,还会降低并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。
乐观锁
乐观锁( Optimistic Locking ) 是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
相对于悲观锁,在对数据库进行处理的时候,乐观锁并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本。
乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁。
悲观锁实现方式
悲观锁的实现,往往依靠数据库提供的锁机制。在数据库中,悲观锁的流程如下:
- 在对记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
- 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。具体响应方式由开发者根据实际需要决定。
- 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
- 其间如果有其他对该记录做修改或加排他锁的操作,都会等待我们解锁或直接抛出异常。
我们拿比较常用的MySql Innodb引擎举例,来说明一下在SQL中如何使用悲观锁。
要使用悲观锁,我们必须关闭mysql数据库的自动提交属性,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。set autocommit=0;
乐观锁实现方式
使用乐观锁就不需要借助数据库的锁机制了。
乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)。
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
比如前面的扣减库存问题,通过乐观锁可以实现如下:
//查询出商品库存信息,quantity = 3
select quantity from items where id=1
//修改商品库存为2
update items set quantity=2 where id=1 and quantity = 3;
以上,我们在更新之前,先查询一下库存表中当前库存数(quantity),然后在做update的时候,以库存数作为一个修改条件。当我们提交更新的时候,判断数据库表对应记录的当前库存数与第一次取出来的库存数进行比对,如果数据库表当前库存数与第一次取出来的库存数相等,则予以更新,否则认为是过期数据。
一旦发上高并发的时候,就只有一个线程可以修改成功,那么就会存在大量的失败。
对于像淘宝这样的电商网站,高并发是常有的事,总让用户感知到失败显然是不合理的。所以,还是要想办法减少乐观锁的粒度的。
有一条比较好的建议,可以减小乐观锁力度,最大程度的提升吞吐率,提高并发能力!如下:
//修改商品库存
update item
set quantity=quantity - 1
where id = 1 and quantity - 1 > 0
以上SQL语句中,如果用户下单数为1,则通过quantity - 1 > 0的方式进行乐观锁控制。
如何选择
在乐观锁与悲观锁的选择上面,主要看下两者的区别以及适用场景就可以了。
1、乐观锁并未真正加锁,效率高。一旦锁的粒度掌握不好,更新失败的概率就会比较高,容易发生业务失败。
2、悲观锁依赖数据库锁,效率低。更新失败的概率比较低。
随着互联网三高架构(高并发、高性能、高可用)的提出,悲观锁已经越来越少的被使用到生产环境中了,尤其是并发量比较大的业务场景。
加群联系作者vx:xiaoda0423
仓库地址:github.com/webVueBlog/…