本文共 11149 字,大约阅读时间需要 37 分钟。
数据库事务需要包含4个基本特性,常称为ACID。具体如下:
在MongoDB中,对单文档的操作是原子的。由于可以在单个文档中使用内嵌文档或数组来获取数据之间的关系,而不必跨多个文档或集合进行范式化。对于那些需要对多个文档(在单个或多个集合中)进行原子性读写的场景,MongoDB支持多文档事务。MongoDB4.2版本全面支持多文档事务。
事务属性 | 支持程度 |
---|---|
Atomocity | 单表单文档:1.x 就支持;复制集多表多行:4.0;分片集群多表多行:4.2 |
Consistency | writeConcern, readConcern (3.2) |
Isolation | readConcern (3.2) |
Durability | Journal日志机制 和 Replication副本集机制 |
writeConcern决定一个写操作落到多少个节点上才算成功。
语法格式:
{ w:, j: , wtimeout: }
MongoDB5.0版本开始,默认是w: majority
。当w: majority
时,j
的默认值取决于writeConcernMajorityJournalDefault
的值,它默认为true,即j: true
确认时要求对磁盘上日志进行写入操作。
w: 数据写入到number个节点才向用客户端确认
{w: 0}
:对客户端的写入不需要发送任何确认,适用于性能要求高,但不关注正确性的场景。{w: 1}
:默认的writeConcern,数据写入到Primary就向客户端发送确认。{w: “majority”}
:数据写入到副本集大多数成员后向客户端发送确认,适用于对数据安全性要求比较高的场景,该选项会降低写入性能。j: 写入操作的journal持久化后才向客户端确认
j
未指定:确认取决于j: true
时要求对磁盘上日志进行写入操作,默认为j: true
。j: true
:确认时要求对磁盘上日志进行写入操作。j: false
:确认要求在内存中进行写入操作。wtimeout: 写入超时时间,仅w的值大于1时有效。
{w: }
时,数据需要成功写入number
个节点才算成功。如果写入过程中有节点故障,可能导致这个条件一直不能满足,从而一直不能向客户端发送确认结果,针对这种情况,客户端可设置wtimeout
选项来指定超时时间,当写入过程持续超过该时间仍未结束,则认为写入失败。readPreFerence决定使用哪一个节点进行读请求。
可选值包括:
通过MongoDB的连接串参数:
# 连接参数后面添加readPreference=secondary 表示从备节点读取数据mongodb://host1:27107,host2:27107,host3:27017/?replicaSet=rs0&readPreference=secondary
通过MongoDB 驱动程序 API:
MongoCollection.withReadPreference(ReadPreference readPref)
通过Mongo Shell:
db.collection.find().readPref( "secondary" )
[root@localhost ~]# mongosh --host rs0/localhost:28017 -u hushang -p 123456rs0 [primary] test> db.user.insertOne({ name: "hs2"},{ writeConcern: { w: 1}})
db.fsyncLock()
来锁定写入(模拟同步延迟):rs0 [direct: secondary] test> db.fsyncLock()
rs0 [primary] test> db.user.insert({ name: "hs3"},{ writeConcern: { w: 1}})
rs0 [primary] test> db.user.find().readPref("secondary")
rs0 [direct: secondary] test> db.fsyncUnlock()
rs0 [primary] test> db.user.find().readPref("secondary")
readPreference 只能控制使用一类节点。Tag 则可以将节点选择控制到一个或几个节点。
考虑以下场景:
可以使用Tag来达到这样的控制目的:
{purpose: “online”}
;{purpose: “analyze”}
;在 readPreference 选择了指定的节点后,readConcern 决定这个节点上的数据哪些是可读的,类似于关系数据库的隔离级别。可选值包括:
在复制集中 local 和 available 是没有区别的,两者的区别主要体现在分片集上。
当发生数据迁移时,分片A把一部分数据迁移至分片B,迁移进行中此时分片B存在了一部分的数据。当readConcern为available时这一部分数据是能读取到的,当readConcern为local时这一部分数据读取不到。
只读取大多数数据节点上都提交了的数据。考虑如下场景:
对于readConcern为majority时,primary必须要到t3时刻才能读取到{x: 1}的值;secondary1必须要到t5时刻才能读取到{x: 1}的值;secondary必须要到t6时刻。
节点上维护多个x版本(类似于MVCC机制),MongoDB通过维护多个快照来链接不同的版本:
db.fsyncLock()
锁住写入(模拟同步延迟):rs0 [direct: secondary] test> db.fsyncLock()
rs0 [primary] test> db.user.insert({ name: "hs"},{ writeConcern: { w: 1}})
rs0 [primary] test> db.user.find().readConcern("local")[ { _id: ObjectId("66a9bc98765490df764272fc"), name: 'hs' } ]
rs0 [primary] test> db.user.find().readConcern("majority")rs0 [primary] test>
rs0 [direct: secondary] test> db.fsyncUnlock()
rs0 [primary] test> db.user.find().readConcern("majority")
MongoDB中的回滚:
所以从分布式系统的角度来看,事务的提交被提升到了分布式集群的多个节点级别的“提交”,而不再是单个节点上的“提交”。
在可能发生回滚的前提下考虑脏读问题:
{readConcern: “majority”}
可以有效避免脏读。考虑如下场景:
思考:如何保证自己能够读到刚刚写入的数据?
{writeConcern: “majority”}
和{readConcern: “majority”}
可以解决。db.orders.insert({ oid:101,sku:"kite",q:1})db.orders.find({ oid:101}).readPref("secondary")
{writeConcern: “majority”}
和{readConcern: “majority”}
:db.orders.insert({ oid:101,sku:"kite",q:1},{ writeConcern:{ w:"majority"}}) db.orders.find({ oid:101}).readPref("secondary").readConcern("majority")
只读取大多数节点确认过的数据。和 majority 最大差别是保证绝对的操作线性顺序。
maxTimeMS
。{readConcern: “snapshot”}
只在多文档事务中生效。
将一个事务的 readConcern 设置为 snapshot,将保证在事务中的读:
因为所有的读都将使用同一个快照,直到事务提交为止该快照才被释放。
开启事务语句:
var session = db.getMongo().startSession()session.startTransaction({ readConcern: { level: "majority"}, writeConcern: { w: "majority"}})var coll = session.getDatabase("数据库名").getCollection("集合名")
事务完成前,事务外的操作对该事务所做的修改不可访问:
db.tx.insertMany([{ x: 1 }, { x: 2 }]) var session = db.getMongo().startSession()session.startTransaction()var coll = session.getDatabase("test").getCollection("tx")coll.updateOne({ x: 1}, { $set: { y: 1}})coll.findOne({ x: 1}) //{ x:1, y:1}db.tx.findOne({ x: 1}) //{ x:1}session.commitTransaction()
如果事务内使用{readConcern: “snapshot”}
,则可以达到可重复读 Repeatable Read。
在执行事务的过程中,如果操作太多,或者存在一些长时间的等待,则可能会产生如下异常:
rs0 [direct: primary] test> session.commitTransaction()MongoServerError: Transaction with { txnNumber: 1 } has been aborted.
原因在于,默认情况下MongoDB会为每个事务设置1分钟的超时时间,如果在该时间内没有提交,就会强制将其终止。该超时时间可以通过transactionLifetimeLimitSecond
变量设定。
var session = db.getMongo().startSession()session.startTransaction({ readConcern: { level: "majority"}, writeConcern: { w: "majority"}})var coll = session.getDatabase('test').getCollection("tx")
coll.update({ _id: ObjectId("66a9bfdb765490df764272fd")},{ $set: { y: 2}})
db.tx.update({ _id: ObjectId("66a9bfdb765490df764272fd")},{ $set: { y: 3}})db.tx.find()coll.find()
session.commitTransaction()
int i = 1/0;
注释掉,则能看到正常事务提交的执行结果。import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import com.mongodb.client.ClientSession;import com.mongodb.client.MongoClient;import com.mongodb.client.MongoClients;import com.mongodb.client.MongoCollection;import com.mongodb.client.model.Filters;import com.mongodb.client.model.Updates;import org.bson.Document;import org.junit.jupiter.api.Test;
@Testpublic void updateTest() { // 连接复制集 MongoClient mongoClient = MongoClients.create("mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0"); // 获取两个collection集合对象 MongoCollection empCollection = mongoClient.getDatabase("test").getCollection("emp"); MongoCollection eventsCollection = mongoClient.getDatabase("test").getCollection("events"); // 事务操作配置 TransactionOptions transactionOptions = TransactionOptions.builder() .writeConcern(WriteConcern.MAJORITY) .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .build(); try (ClientSession clientSession = mongoClient.startSession()) { // 开启事务 clientSession.startTransaction(); try { empCollection.updateOne(clientSession, Filters.eq("_id", "66a9dbb8bb75fd99739d6aea"), Updates.set("name", "hushang")); int i = 1 / 0; eventsCollection.insertOne(clientSession, new Document("username", "hs1").append("status", new Document("new", "inactive").append("old", "Active"))); // 提交事务 clientSession.commitTransaction(); } catch (Exception e) { // 回滚事务 clientSession.abortTransaction(); } }}
int i = 1/0;
注释掉,则能看到正常事务提交的执行结果。package com.hs.learn.config;import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.mongodb.MongoDatabaseFactory;import org.springframework.data.mongodb.MongoTransactionManager;
@Configurationpublic class MongodbConfig { @Bean public MongoTransactionManager transactionManager(MongoDatabaseFactory factory) { TransactionOptions transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .build(); return new MongoTransactionManager(factory, transactionOptions); }}
package com.hs.learn.service;import com.hs.learn.entity.Employee;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.Date;
@Service@Transactional(rollbackFor = Exception.class)public class EmployeeService { @Autowired private MongoTemplate mongoTemplate; public void addEmployee() { Employee employee1 = new Employee(1, "hushang1", 25, new Date(), "测试数据1"); Employee employee2 = new Employee(2, "hushang2", 25, new Date(), "测试数据2"); mongoTemplate.insert(employee1); int i = 1 / 0; mongoTemplate.insert(employee2); }}
@Autowiredprivate EmployeeService employeeService;@Testpublic void test() { employeeService.addEmployee();}
转载地址:http://unffk.baihongyu.com/