在分布式应用中,统计当前登录用户数量是一个常见需求。通常,开发者会选择将用户计数存储在共享缓存中,例如infinispan。当用户登录时,从缓存中获取当前值,加1后再存回缓存。然而,在多用户同时登录的高并发场景下,这种简单的“读取-修改-写入”操作序列极易引发竞态条件(race condition),导致计数不准确。例如,两个用户几乎同时登录,都读取到旧的计数n,然后各自加1并写回n+1,最终结果本应是n+2,却只变成了n+1,造成数据丢失。为了解决这一问题,infinispan提供了多种强大的并发控制机制。
1. 使用Infinispan分布式计数器
Infinispan提供了一个专门的分布式计数器(Counters)API,旨在解决这种原子性增减操作的需求。它支持强一致性计数器(StrongCounter)和弱一致性计数器(WeakCounter)。对于用户登录统计这种需要精确计数的场景,通常推荐使用强一致性计数器。
特点:
- 原子性: 保证增减操作的原子性,即使在分布式环境中也能避免竞态条件。
- 持久性: 计数器值可以被持久化。
- 简单易用: API设计简洁,专注于计数操作。
示例代码:
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterType;
public class UserCounterService {
private final RemoteCacheManager cacheManager;
private final StrongCounter loggedInUsersCounter;
public UserCounterService(RemoteCache RemoteCacheManager cacheManager) {
this.cacheManager = cacheManager;
CounterManager counterManager = cacheManager.get
CounterManager();
// 获取或创建强一致性计数器
// 可以配置初始值、上限、下限等
this.loggedInUsersCounter = counterManager.define
StrongCounter("loggedInUsers",
CounterConfiguration.builder(CounterType.BOUNDED_STRONG)
.initialValue(0)
.lowerBound(0)
.build());
// 或者直接获取已定义的计数器
// this.loggedInUsersCounter = counterManager.getStrongCounter("loggedInUsers");
}
/**
* 用户登录时调用,增加计数
*/
public void userLoggedIn() {
loggedInUsersCounter.increment();
System.out.println("用户登录,当前用户数: " + loggedInUsersCounter.getValue());
}
/**
* 用户登出时调用,减少计数
*/
public void userLoggedOut() {
loggedInUsersCounter.decrement();
System.out.println("用户登出,当前用户数: " + loggedInUsersCounter.getValue());
}
/**
* 获取当前登录用户数
* @return 当前用户数
*/
public long getCurrentLoggedInUsers() {
return loggedInUsersCounter.getValue();
}
public static void main(String[] args) {
// 假设已经配置并启动了RemoteCacheManager
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
UserCounterService service = new UserCounterService(remoteCacheManager);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(service::userLoggedIn).start();
}
try {
Thread.sleep(1000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
}2. 使用Infinispan事务
Infinispan支持分布式事务,可以将一系列操作封装在一个原子单元中。这意味着,在一个事务中的所有操作要么全部成功,要么全部失败。通过将“读取-修改-写入”逻辑放入一个事务中,可以确保其原子性。
特点:
- ACID特性: 提供原子性、一致性、隔离性和持久性(如果配置了持久化)。
- 复杂操作: 适用于需要对多个键或执行更复杂逻辑的场景。
- 性能开销: 相比计数器,事务通常会有更高的性能开销,因为它涉及更多的协调和锁定机制。
示例代码:
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.CacheContainer;
import org.infinispan.commons.api.CacheContainer.Builder;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
public class UserLoginTransactionalService {
private final RemoteCache userCountCache;
private final TransactionManager tm; // JBoss Transaction Manager or similar
private final UserTransaction ut;
public UserLoginTransactionalService(RemoteCacheManager cacheManager) throws Exception {
// 确保缓存配置为事务模式
// 在服务器端或客户端配置中设置 transactionMode(TransactionMode.TRANSACTIONAL)
this.userCountCache = cacheManager.getCache("userCountCache");
// 获取JTA事务管理器和用户事务
// 在实际应用中,通常通过JNDI查找或DI框架注入
this.tm = com.arjuna.ats.jta.TransactionManager.transactionManager(); // 示例:使用Narayana
this.ut = com.arjuna.ats.jta.UserTransaction.userTransaction();
}
public void userLoggedIn() throws Exception {
ut.begin(); // 开启事务
try {
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L;
}
userCountCache.put("loggedInUsers", currentCount + 1);
ut.commit(); // 提交事务
System.out.println("用户登录(事务),当前用户数: " + userCountCache.get("loggedInUsers"));
} catch (Exception e) {
ut.rollback(); // 回滚事务
System.err.println("用户登录事务失败: " + e.getMessage());
throw e;
}
}
public long getCurrentLoggedInUsers() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) throws Exception {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
// 确保服务器端缓存 'userCountCache' 配置为 TRANSACTIONAL 模式
UserLoginTransactionalService service = new UserLoginTransactionalService(remoteCacheManager);
// 初始化计数
service.userCountCache.put("loggedInUsers", 0L);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
service.userLoggedIn();
} catch (Exception e) {
// 处理异常
}
}).start();
}
try {
Thread.sleep(2000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
} 注意事项:
- JTA集成: Infinispan的事务通常与Java事务API (JTA) 集成。你需要一个JTA事务管理器(如Narayana、Atomikos)来管理事务生命周期。
- 缓存配置: 相关的Infinispan缓存必须在服务器端配置为事务模式(transactionMode="TRANSACTIONAL")。
- 锁定模式: Infinispan支持乐观锁定和悲观锁定。乐观锁定(默认)在提交时检查冲突,悲观锁定在事务开始时就获取锁。根据并发程度和冲突概率选择合适的模式。
3. 使用版本化操作(Conditional Puts)
Infinispan的Hot Rod客户端API提供了一些条件操作,例如replace(K key, V oldValue, V newValue)和putIfAbsent(K key, V value)。这些操作只有在满足特定条件时才会执行,可以用于实现乐观锁机制。
特点:
- 乐观并发控制: 假设冲突不常发生,通过检查版本或旧值来解决冲突。
- 无事务开销: 不需要显式的事务管理器,开销相对较小。
- 需要重试逻辑: 当条件不满足时,操作会失败,客户端需要实现重试逻辑。
示例代码:
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
public class UserLoginVersionedService {
private final RemoteCache userCountCache;
public UserLoginVersionedService(RemoteCacheManager cacheManager) {
this.userCountCache = cacheManager.getCache("userCountCache");
}
public void userLoggedIn() {
int maxRetries = 5; // 最大重试次数
for (int i = 0; i < maxRetries; i++) {
Long currentCount = userCountCache.get("loggedInUsers");
if (currentCount == null) {
currentCount = 0L;
}
Long newCount = currentCount + 1;
// 尝试原子性替换:只有当键'loggedInUsers'的值仍然是currentCount时,才将其更新为newCount
boolean success = userCountCache.replace("loggedInUsers", currentCount, newCount);
if (success) {
System.out.println("用户登录(版本化),当前用户数: " + newCount);
return; // 更新成功,退出循环
} else {
// 更新失败,说明其他线程在此期间修改了值,需要重试
System.out.println("更新冲突,重试... (当前尝试: " + (i + 1) + ")");
try {
Thread.sleep(100); // 稍作等待,避免紧密循环
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
System.err.println("用户登录失败,达到最大重试次数。");
}
public long getCurrentLoggedInUsers() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
UserLoginVersionedService service = new UserLoginVersionedService(remoteCacheManager);
// 初始化计数
service.userCountCache.put("loggedInUsers", 0L);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(service::userLoggedIn).start();
}
try {
Thread.sleep(2000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
} 选择合适的策略
- 对于简单的原子性增减操作(如用户计数):Infinispan分布式计数器是最佳选择。它专门为此类场景设计,API简洁,性能高效,且保证强一致性。
- 对于涉及多个键或更复杂业务逻辑的原子性操作:Infinispan事务更为合适。它提供了完整的ACID保证,但会引入JTA依赖和更高的性能开销。
- 对于对性能要求极高,且可以容忍少量重试,或事务开销过大的场景:版本化操作结合重试机制可以作为一种替代方案。它提供了乐观并发控制,但需要客户端自行实现重试逻辑。
注意事项与最佳实践
- Infinispan服务器配置: 确保Infinispan服务器端已正确配置缓存,特别是对于事务模式和持久化需求。
- 错误处理: 无论是计数器、事务还是版本化操作,都应包含健壮的错误处理逻辑,例如事务回滚、重试机制等。
- 性能考量: 评估不同策略的性能影响。计数器通常最轻量级,事务开销最大。
- 一致性模型: 理解强一致性与弱一致性计数器的区别,根据业务需求选择。用户登录计数通常需要强一致性。
- 集群环境: Infinispan的这些机制都是为分布式集群环境设计的,确保你的集群配置正确,以充分利用其并发控制能力。
总结
在分布式系统中处理并发计数问题是构建高可用和数据一致性应用的关键。Infinispan作为一款强大的分布式缓存,提供了多种机制来应对这些挑战。对于用户登录计数这类常见场景,Infinispan的分布式计数器提供了最直接和高效的解决方案。当业务逻辑更复杂时,事务和版本化操作则提供了更灵活的并发控制手段。理解并选择最适合你应用场景的Infinispan并发控制策略,是确保系统健壮性和数据准确性的重要一步。

try {
Thread.sleep(100); // 稍作等待,避免紧密循环
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
System.err.println("用户登录失败,达到最大重试次数。");
}
public long getCurrentLoggedInUsers() {
Long count = userCountCache.get("loggedInUsers");
return count != null ? count : 0L;
}
public static void main(String[] args) {
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("127.0.0.1:11222"); // 示例配置
UserLoginVersionedService service = new UserLoginVersionedService(remoteCacheManager);
// 初始化计数
service.userCountCache.put("loggedInUsers", 0L);
// 模拟多个用户同时登录
for (int i = 0; i < 10; i++) {
new Thread(service::userLoggedIn).start();
}
try {
Thread.sleep(2000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终登录用户数: " + service.getCurrentLoggedInUsers());
remoteCacheManager.stop();
}
}






