MySQL的分布式事务解决方案

person 花一样的女孩    watch_later 2024-07-12 22:51:29
visibility 266    class 分布式事务,MySQL    bookmark 分享

分布式事务是在多个不同的数据源或数据库节点上执行的事务,并且需要保证这些事务具有原子性、一致性、隔离性和持久性(ACID特性)。在基于MySQL的系统中,常见的分布式事务解决方案包括两阶段提交协议(2PC)、三阶段提交协议(3PC)、基于消息中间件的事务(如RocketMQ事务消息),以及TCC(Try-Confirm-Cancel)等。下面是一个基于MySQL的分布式事务解决方案的详细介绍和实现:

1. 两阶段提交协议(2PC)

概念

两阶段提交协议(2PC)是一种经典的分布式事务处理协议,分为两个阶段:准备阶段和提交阶段。

  • 准备阶段:协调者向所有参与者发送请求,询问他们是否可以提交事务,并等待所有参与者的回复。
  • 提交阶段:如果所有参与者都同意提交事务,协调者向所有参与者发送提交请求;如果有任何一个参与者不同意提交,协调者向所有参与者发送回滚请求。

实现步骤

  1. 初始化项目并安装依赖
mkdir distributed-tx-example
cd distributed-tx-example
npm init -y
npm install mysql2 express body-parser
  1. 创建数据库和表
-- 数据库1:db1
CREATE DATABASE db1;
USE db1;
CREATE TABLE accounts (
    id INT PRIMARY KEY AUTO_INCREMENT,
    balance DECIMAL(10, 2) NOT NULL
);

-- 数据库2:db2
CREATE DATABASE db2;
USE db2;
CREATE TABLE transactions (
    id INT PRIMARY KEY AUTO_INCREMENT,
    account_id INT NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    status ENUM('PENDING', 'COMMITTED', 'ROLLED_BACK') NOT NULL
);
  1. 实现两阶段提交协议

server.js中实现两阶段提交协议:

const express = require('express');
const bodyParser = require('body-parser');
const mysql = require('mysql2/promise');

const app = express();
app.use(bodyParser.json());

const db1 = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'db1',
});

const db2 = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'db2',
});

app.post('/transfer', async (req, res) => {
    const { fromAccountId, toAccountId, amount } = req.body;

    const conn1 = await db1.getConnection();
    const conn2 = await db2.getConnection();

    try {
        await conn1.beginTransaction();
        await conn2.beginTransaction();

        const [fromAccount] = await conn1.query('SELECT balance FROM accounts WHERE id = ?', [fromAccountId]);
        if (fromAccount[0].balance < amount) {
            throw new Error('Insufficient funds');
        }

        await conn1.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [amount, fromAccountId]);
        await conn2.query('INSERT INTO transactions (account_id, amount, status) VALUES (?, ?, ?)', [toAccountId, amount, 'PENDING']);

        await conn1.commit();
        await conn2.commit();

        // 第二阶段提交
        await conn2.query('UPDATE transactions SET status = ? WHERE account_id = ? AND amount = ?', ['COMMITTED', toAccountId, amount]);

        res.json({ message: 'Transfer successful' });
    } catch (error) {
        await conn1.rollback();
        await conn2.rollback();

        // 第二阶段回滚
        await conn2.query('UPDATE transactions SET status = ? WHERE account_id = ? AND amount = ?', ['ROLLED_BACK', toAccountId, amount]);

        res.status(500).json({ message: 'Transfer failed', error: error.message });
    } finally {
        conn1.release();
        conn2.release();
    }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
    console.log(`Server is running on port ${PORT}`);
});

运行和测试

启动服务器并测试转账API:

node server.js

使用Postman或curl测试转账API:

curl -X POST http://localhost:3000/transfer -H "Content-Type: application/json" -d '{"fromAccountId": 1, "toAccountId": 2, "amount": 100.00}'

2. TCC(Try-Confirm-Cancel)模式

TCC(Try-Confirm-Cancel)是一种柔性事务模型,通过将业务操作拆分为三个步骤来保证分布式事务的最终一致性:

  1. Try:尝试执行业务操作,预留资源。
  2. Confirm:确认业务操作,真正执行操作。
  3. Cancel:取消业务操作,释放预留资源。

实现步骤

假设我们有两个服务,一个处理账户操作,另一个处理交易记录。

  1. 账户服务:负责处理账户余额的预留和确认。
  2. 交易服务:负责记录交易并确认交易状态。
const express = require('express');
const bodyParser = require('body-parser');
const mysql = require('mysql2/promise');

const app = express();
app.use(bodyParser.json());

const accountDb = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'db1',
});

const transactionDb = mysql.createPool({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'db2',
});

// 账户服务
app.post('/account/try', async (req, res) => {
    const { accountId, amount } = req.body;

    const conn = await accountDb.getConnection();
    try {
        await conn.beginTransaction();

        const [account] = await conn.query('SELECT balance FROM accounts WHERE id = ?', [accountId]);
        if (account[0].balance < amount) {
            throw new Error('Insufficient funds');
        }

        await conn.query('UPDATE accounts SET balance = balance - ? WHERE id = ?', [amount, accountId]);

        await conn.commit();
        res.json({ message: 'Try successful' });
    } catch (error) {
        await conn.rollback();
        res.status(500).json({ message: 'Try failed', error: error.message });
    } finally {
        conn.release();
    }
});

app.post('/account/confirm', async (req, res) => {
    const { accountId, amount } = req.body;

    // 确认操作逻辑

    res.json({ message: 'Confirm successful' });
});

app.post('/account/cancel', async (req, res) => {
    const { accountId, amount } = req.body;

    const conn = await accountDb.getConnection();
    try {
        await conn.beginTransaction();

        await conn.query('UPDATE accounts SET balance = balance + ? WHERE id = ?', [amount, accountId]);

        await conn.commit();
        res.json({ message: 'Cancel successful' });
    } catch (error) {
        await conn.rollback();
        res.status(500).json({ message: 'Cancel failed', error: error.message });
    } finally {
        conn.release();
    }
});

// 交易服务
app.post('/transaction/try', async (req, res) => {
    const { transactionId, accountId, amount } = req.body;

    const conn = await transactionDb.getConnection();
    try {
        await conn.beginTransaction();

        await conn.query('INSERT INTO transactions (id, account_id, amount, status) VALUES (?, ?, ?, ?)', [transactionId, accountId, amount, 'PENDING']);

        await conn.commit();
        res.json({ message: 'Try successful' });
    } catch (error) {
        await conn.rollback();
        res.status(500).json({ message: 'Try failed', error: error.message });
    } finally {
        conn.release();
    }
});

app.post('/transaction/confirm', async (req, res) => {
    const { transactionId } = req.body;

    const conn = await transactionDb.getConnection();
    try {
        await conn.beginTransaction();

        await conn.query('UPDATE transactions SET status = ? WHERE id = ?', ['COMMITTED', transactionId]);

        await conn.commit();
        res.json({ message: 'Confirm successful' });
    } catch (error) {
        await conn.rollback();
        res.status(500).json({ message: 'Confirm failed', error: error.message });
    } finally {
        conn.release();
    }
});

app.post('/transaction/cancel', async (req, res) => {
    const { transactionId } = req.body;

    const conn = await transactionDb.getConnection();
    try {
        await conn.beginTransaction();

        await conn.query('UPDATE transactions SET status = ? WHERE id = ?', ['ROLLED_BACK', transactionId]);

        await conn.commit();
        res.json({ message: 'Cancel successful' });
    } catch (error) {
        await conn.rollback();
        res.status(500).json({ message: 'Cancel failed', error: error.message });
    } finally {
        conn.release();
    }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
    console.log(`Server is running on port ${PORT}`);
});

运行和测试

启动

评论区
评论列表
menu