-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathOptimisticTxnExample.java
More file actions
160 lines (135 loc) · 7.15 KB
/
OptimisticTxnExample.java
File metadata and controls
160 lines (135 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.pingcap.txn;
import com.zaxxer.hikari.HikariDataSource;
import java.math.BigDecimal;
import java.sql.*;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class OptimisticTxnExample {
public static void main(String[] args) throws SQLException, InterruptedException {
System.out.println(Arrays.toString(args));
int aliceQuantity = 0;
int bobQuantity = 0;
for (String arg: args) {
if (arg.startsWith("ALICE_NUM")) {
aliceQuantity = Integer.parseInt(arg.replace("ALICE_NUM=", ""));
}
if (arg.startsWith("BOB_NUM")) {
bobQuantity = Integer.parseInt(arg.replace("BOB_NUM=", ""));
}
}
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/bookshop?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
ds.setPassword("");
// prepare data
Connection connection = ds.getConnection();
createBook(connection, 1L, "Designing Data-Intensive Application", "Science & Technology",
Timestamp.valueOf("2018-09-01 00:00:00"), new BigDecimal(100), 10);
createUser(connection, 1L, "Bob", new BigDecimal(10000));
createUser(connection, 2L, "Alice", new BigDecimal(10000));
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
final int finalBobQuantity = bobQuantity;
threadPool.execute(() -> {
buy(ds, 1, 1000L, 1L, 1L, finalBobQuantity, 5);
countDownLatch.countDown();
});
final int finalAliceQuantity = aliceQuantity;
threadPool.execute(() -> {
buy(ds, 2, 1001L, 1L, 2L, finalAliceQuantity, 5);
countDownLatch.countDown();
});
countDownLatch.await(5, TimeUnit.SECONDS);
System.exit(0);
}
public static void createUser(Connection connection, Long id, String nickname, BigDecimal balance) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `users` (`id`, `nickname`, `balance`) VALUES (?, ?, ?)");
insert.setLong(1, id);
insert.setString(2, nickname);
insert.setBigDecimal(3, balance);
insert.executeUpdate();
}
public static void createBook(Connection connection, Long id, String title, String type, Timestamp publishedAt, BigDecimal price, Integer stock) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `books` (`id`, `title`, `type`, `published_at`, `price`, `stock`) values (?, ?, ?, ?, ?, ?)");
insert.setLong(1, id);
insert.setString(2, title);
insert.setString(3, type);
insert.setTimestamp(4, publishedAt);
insert.setBigDecimal(5, price);
insert.setInt(6, stock);
insert.executeUpdate();
}
public static void buy (HikariDataSource ds, Integer threadID, Long orderID, Long bookID,
Long userID, Integer quantity, Integer retryTimes) {
String txnComment = "/* txn " + threadID + " */ ";
System.out.printf("\nuser %d try to buy %d books(id: %d)\n", userID, quantity, bookID);
try (Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
connection.createStatement().executeUpdate(txnComment + "begin optimistic");
// waiting for other thread ran the 'begin optimistic' statement
TimeUnit.SECONDS.sleep(1);
BigDecimal price = null;
// read price of book
PreparedStatement selectBook = connection.prepareStatement(txnComment + "select * from books where id = ? for update");
selectBook.setLong(1, bookID);
ResultSet res = selectBook.executeQuery();
if (!res.next()) {
throw new RuntimeException("book not exist");
} else {
price = res.getBigDecimal("price");
int stock = res.getInt("stock");
if (stock < quantity) {
throw new RuntimeException("book not enough");
}
}
// update book
String updateBookSQL = "update `books` set stock = stock - ? where id = ? and stock - ? >= 0";
PreparedStatement updateBook = connection.prepareStatement(txnComment + updateBookSQL);
updateBook.setInt(1, quantity);
updateBook.setLong(2, bookID);
updateBook.setInt(3, quantity);
updateBook.executeUpdate();
// insert order
String insertOrderSQL = "insert into `orders` (`id`, `book_id`, `user_id`, `quality`) values (?, ?, ?, ?)";
PreparedStatement insertOrder = connection.prepareStatement(txnComment + insertOrderSQL);
insertOrder.setLong(1, orderID);
insertOrder.setLong(2, bookID);
insertOrder.setLong(3, userID);
insertOrder.setInt(4, quantity);
insertOrder.executeUpdate();
// update user
String updateUserSQL = "update `users` set `balance` = `balance` - ? where id = ?";
PreparedStatement updateUser = connection.prepareStatement(txnComment + updateUserSQL);
updateUser.setBigDecimal(1, price.multiply(new BigDecimal(quantity)));
updateUser.setLong(2, userID);
updateUser.executeUpdate();
connection.createStatement().executeUpdate(txnComment + "commit");
System.out.printf("\nuser %d buy %d books(id: %d) successfully\n", userID, quantity, bookID);
} catch (Exception e) {
connection.createStatement().executeUpdate(txnComment + "rollback");
System.out.printf("\nuser %d fail to buy %d books(id: %d) because %s error\n", userID, quantity, bookID, e.getMessage());
if (e instanceof SQLException sqlException) {
switch (sqlException.getErrorCode()) {
// You can get all error codes at https://docs.pingcap.com/tidb/stable/error-codes
case 9007: // Transactions in TiKV encounter write conflicts.
case 8028: // table schema changes
case 8002: // "SELECT FOR UPDATE" commit conflict
case 8022: // The transaction commit fails and has been rolled back
if (retryTimes != 0) {
System.out.println("rest " + retryTimes + " times. retry for " + e.getMessage());
buy(ds, threadID, orderID, bookID, userID, quantity, retryTimes - 1);
}
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}