Intro
As I've promised I've hacked up a complete example. I've used MySQL and created three tables like the following:
CREATE TABLE `test{1,2,3}` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`data` varchar(255) NOT NULL UNIQUE,
PRIMARY KEY (`id`)
);
test2 contains a single row initially.
INSERT INTO `test2` (`data`) VALUES ('a');
(I've posted the full code to http://pastebin.com.)
The following example does several things.
- Sets
threads to 3 which determines how many jobs are going to be run in parallel.
- Creates
threads number of connections.
- Spouts out some sample data for every table (by default the data is
a for every table).
- Creates
threads number of jobs to be run and loads them with data.
- Runs the jobs in
threads number of threads and waits for their completion (successful or not).
- If no exceptions occurred commits every connection; otherwise it rolls back each of them.
- Closes the connections (however these can be reused).
(Note, that I've used Java 7's automatic resource management feature in SQLTask.call().)
Logic
public static void main(String[] args) throws SQLException, InterruptedException {
int threads = 3;
List<Connection> connections = getConnections(threads);
Map<String, String> tableData = getTableData(threads);
List<SQLTask> tasks = getTasks(threads, connections);
setData(tableData, tasks);
try {
runTasks(tasks);
commitConnections(connections);
} catch (ExecutionException ex) {
rollbackConnections(connections);
} finally {
closeConnections(connections);
}
}
Data
private static Map<String, String> getTableData(int threads) {
Map<String, String> tableData = new HashMap<>();
for (int i = 1; i <= threads; i++)
tableData.put("test" + i, "a");
return tableData;
}
Tasks
private static final class SQLTask implements Callable<Void> {
private final Connection connection;
private String data;
private String table;
public SQLTask(Connection connection) {
this.connection = connection;
}
public void setTable(String table) {
this.table = table;
}
public void setData(String data) {
this.data = data;
}
@Override
public Void call() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(String.format(
"INSERT INTO `%s` (data) VALUES ('%s');", table, data));
}
return null;
}
}
private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
List<SQLTask> tasks = new ArrayList<>();
for (int i = 0; i < threads; i++)
tasks.add(new SQLTask(connections.get(i)));
return tasks;
}
private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
Iterator<SQLTask> j = tasks.iterator();
while (i.hasNext()) {
Entry<String, String> entry = i.next();
SQLTask task = j.next();
task.setTable(entry.getKey());
task.setData(entry.getValue());
}
}
Run
private static void runTasks(List<SQLTask> tasks)
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
List<Future<Void>> futures = executorService.invokeAll(tasks);
executorService.shutdown();
for (Future<Void> future : futures)
future.get();
}
Result
Given the default data returned by getTableData(...)
test1 -> `a`
test2 -> `a`
test3 -> `a`
and the fact that test2 already contains a (and the data column is unique) the second job will fail and throw an exception, thus every connection will be rolled back.
If instead of as you return bs, then the connections will be committed safely.
This can be done similarly with LOAD DATA.
After OP's response on my answer I realized that what she/he wants to do isn't possible to do in a simple and clear manner.
Basically the problem is that after a successful commit the data that was committed can't be rolled-back, because the operation is atomic. Given multiple commits are needed in the case given, rolling-back everything isn't possible unless one tracks all data (in all of the transactions) and if somethings happens deletes everything that was successfully committed.
There is a nice answer relating to the issue of commits and rollbacks.