-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathEffectWriteSkew.java
More file actions
138 lines (119 loc) · 5.34 KB
/
EffectWriteSkew.java
File metadata and controls
138 lines (119 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.pingcap;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* EffectWriteSkew
*
* @author Icemap
* @date 2022/4/25
*/
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
System.exit(0);
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN pessimistic");
// Txn 1 should be waiting until txn 2 is done.
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 is done. Let txn 1 run again.
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}