分布式事务是在多个不同的数据源或数据库节点上执行的事务,并且需要保证这些事务具有原子性、一致性、隔离性和持久性(ACID特性)。在基于MySQL的系统中,常见的分布式事务解决方案包括两阶段提交协议(2PC)、三阶段提交协议(3PC)、基于消息中间件的事务(如RocketMQ事务消息),以及TCC(Try-Confirm-Cancel)等。下面是一个基于MySQL的分布式事务解决方案的详细介绍和实现:
两阶段提交协议(2PC)是一种经典的分布式事务处理协议,分为两个阶段:准备阶段和提交阶段。
mkdir distributed-tx-example
cd distributed-tx-example
npm init -y
npm install mysql2 express body-parser
-- 数据库1:db1
CREATE DATABASE db1;
USE db1;
CREATE TABLE accounts (
id INT PRIMARY KEY AUTO_INCREMENT,
balance DECIMAL(10, 2) NOT NULL
);
-- 数据库2:db2
CREATE DATABASE db2;
USE db2;
CREATE TABLE transactions (
id INT PRIMARY KEY AUTO_INCREMENT,
account_id INT NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status ENUM('PENDING', 'COMMITTED', 'ROLLED_BACK') NOT NULL
);
在server.js
中实现两阶段提交协议:
const express = require('express');
const bodyParser = require('body-parser');
const mysql = require('mysql2/promise');
const app = express();
app.use(bodyParser.json());
const db1 = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'db1',
});
const db2 = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'db2',
});
app.post('/transfer', async (req, res) => {
const { fromAccountId, toAccountId, amount } = req.body;
const conn1 = await db1.getConnection();
const conn2 = await db2.getConnection();
try {
await conn1.beginTransaction();
await conn2.beginTransaction();
const [fromAccount] = await conn1.query('SELECT balance FROM accounts WHERE id = ?', [fromAccountId]);
if (fromAccount[0].balance < amount) {
throw new Error('Insufficient funds');
}
await conn1.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [amount, fromAccountId]);
await conn2.query('INSERT INTO transactions (account_id, amount, status) VALUES (?, ?, ?)', [toAccountId, amount, 'PENDING']);
await conn1.commit();
await conn2.commit();
// 第二阶段提交
await conn2.query('UPDATE transactions SET status = ? WHERE account_id = ? AND amount = ?', ['COMMITTED', toAccountId, amount]);
res.json({ message: 'Transfer successful' });
} catch (error) {
await conn1.rollback();
await conn2.rollback();
// 第二阶段回滚
await conn2.query('UPDATE transactions SET status = ? WHERE account_id = ? AND amount = ?', ['ROLLED_BACK', toAccountId, amount]);
res.status(500).json({ message: 'Transfer failed', error: error.message });
} finally {
conn1.release();
conn2.release();
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
});
启动服务器并测试转账API:
node server.js
使用Postman或curl
测试转账API:
curl -X POST http://localhost:3000/transfer -H "Content-Type: application/json" -d '{"fromAccountId": 1, "toAccountId": 2, "amount": 100.00}'
TCC(Try-Confirm-Cancel)是一种柔性事务模型,通过将业务操作拆分为三个步骤来保证分布式事务的最终一致性:
假设我们有两个服务,一个处理账户操作,另一个处理交易记录。
const express = require('express');
const bodyParser = require('body-parser');
const mysql = require('mysql2/promise');
const app = express();
app.use(bodyParser.json());
const accountDb = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'db1',
});
const transactionDb = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'db2',
});
// 账户服务
app.post('/account/try', async (req, res) => {
const { accountId, amount } = req.body;
const conn = await accountDb.getConnection();
try {
await conn.beginTransaction();
const [account] = await conn.query('SELECT balance FROM accounts WHERE id = ?', [accountId]);
if (account[0].balance < amount) {
throw new Error('Insufficient funds');
}
await conn.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [amount, accountId]);
await conn.commit();
res.json({ message: 'Try successful' });
} catch (error) {
await conn.rollback();
res.status(500).json({ message: 'Try failed', error: error.message });
} finally {
conn.release();
}
});
app.post('/account/confirm', async (req, res) => {
const { accountId, amount } = req.body;
// 确认操作逻辑
res.json({ message: 'Confirm successful' });
});
app.post('/account/cancel', async (req, res) => {
const { accountId, amount } = req.body;
const conn = await accountDb.getConnection();
try {
await conn.beginTransaction();
await conn.query('UPDATE accounts SET balance = balance + ? WHERE id = ?', [amount, accountId]);
await conn.commit();
res.json({ message: 'Cancel successful' });
} catch (error) {
await conn.rollback();
res.status(500).json({ message: 'Cancel failed', error: error.message });
} finally {
conn.release();
}
});
// 交易服务
app.post('/transaction/try', async (req, res) => {
const { transactionId, accountId, amount } = req.body;
const conn = await transactionDb.getConnection();
try {
await conn.beginTransaction();
await conn.query('INSERT INTO transactions (id, account_id, amount, status) VALUES (?, ?, ?, ?)', [transactionId, accountId, amount, 'PENDING']);
await conn.commit();
res.json({ message: 'Try successful' });
} catch (error) {
await conn.rollback();
res.status(500).json({ message: 'Try failed', error: error.message });
} finally {
conn.release();
}
});
app.post('/transaction/confirm', async (req, res) => {
const { transactionId } = req.body;
const conn = await transactionDb.getConnection();
try {
await conn.beginTransaction();
await conn.query('UPDATE transactions SET status = ? WHERE id = ?', ['COMMITTED', transactionId]);
await conn.commit();
res.json({ message: 'Confirm successful' });
} catch (error) {
await conn.rollback();
res.status(500).json({ message: 'Confirm failed', error: error.message });
} finally {
conn.release();
}
});
app.post('/transaction/cancel', async (req, res) => {
const { transactionId } = req.body;
const conn = await transactionDb.getConnection();
try {
await conn.beginTransaction();
await conn.query('UPDATE transactions SET status = ? WHERE id = ?', ['ROLLED_BACK', transactionId]);
await conn.commit();
res.json({ message: 'Cancel successful' });
} catch (error) {
await conn.rollback();
res.status(500).json({ message: 'Cancel failed', error: error.message });
} finally {
conn.release();
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
});
启动