Harden Android realtime recovery

This commit is contained in:
kris
2026-04-10 13:38:21 +08:00
parent c4dbfc7398
commit b1a0516717
3 changed files with 147 additions and 36 deletions

View File

@@ -54,6 +54,10 @@ public class BossApiClient {
return !getSessionCookie().isEmpty() || !getRestoreToken().isEmpty();
}
boolean hasRestoreToken() {
return !getRestoreToken().isEmpty();
}
public ApiResponse autoLogin() throws IOException, JSONException {
ApiResponse response = request("POST", "/api/auth/login", new JSONObject(), false);
if (response.ok()) {

View File

@@ -1,5 +1,7 @@
package com.hyzq.boss;
import android.util.Log;
import androidx.annotation.Nullable;
import org.json.JSONException;
@@ -17,13 +19,20 @@ import java.util.Iterator;
import java.nio.charset.StandardCharsets;
final class BossRealtimeClient {
private static final String TAG = "BossRealtime";
private static final String HEARTBEAT_EVENT_NAME = "heartbeat";
private static final String REALTIME_STREAM_HTTP_401 = "REALTIME_STREAM_HTTP_401";
private static final int STREAM_CONNECT_TIMEOUT_MS = 12_000;
private static final int STREAM_READ_TIMEOUT_MS = 30_000;
private static final long INITIAL_BACKOFF_MS = 800L;
private static final long MAX_BACKOFF_MS = 5000L;
interface Listener {
void onRealtimeEvent(BossRealtimeEvent event);
}
private final BossApiClient apiClient;
private final Listener listener;
private static final String HEARTBEAT_EVENT_NAME = "heartbeat";
private volatile boolean running;
private @Nullable Thread workerThread;
private @Nullable HttpURLConnection activeConnection;
@@ -55,22 +64,40 @@ final class BossRealtimeClient {
}
private void runLoop() {
long backoffMs = 800L;
long backoffMs = INITIAL_BACKOFF_MS;
while (running) {
try {
Log.i(TAG, "Realtime stream connecting");
openAndConsumeStream();
backoffMs = 800L;
} catch (Exception ignored) {
backoffMs = INITIAL_BACKOFF_MS;
} catch (Exception error) {
if (!running) {
return;
}
if (shouldAttemptSessionRestore(error)) {
try {
BossApiClient.ApiResponse restored = apiClient.restoreSession();
if (restored.ok()) {
Log.i(TAG, "Realtime stream session restored");
backoffMs = INITIAL_BACKOFF_MS;
continue;
}
Log.w(
TAG,
"Realtime stream restore failed: " + restored.statusCode + " " + restored.message()
);
} catch (Exception restoreError) {
Log.w(TAG, "Realtime stream restore threw", restoreError);
}
}
Log.w(TAG, "Realtime stream disconnected; retrying in " + backoffMs + "ms", error);
try {
Thread.sleep(backoffMs);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
return;
}
backoffMs = Math.min(backoffMs + 1200L, 5000L);
backoffMs = Math.min(backoffMs + 1200L, MAX_BACKOFF_MS);
}
}
}
@@ -78,38 +105,44 @@ final class BossRealtimeClient {
private void openAndConsumeStream() throws IOException {
HttpURLConnection connection = apiClient.openConnection("/api/v1/events");
activeConnection = connection;
connection.setRequestMethod("GET");
connection.setConnectTimeout(12_000);
connection.setReadTimeout(60_000);
connection.setUseCaches(false);
connection.setDoInput(true);
connection.setRequestProperty("Accept", "text/event-stream");
connection.setRequestProperty("Cache-Control", "no-cache");
connection.setRequestProperty("x-boss-native-app", "1");
String cookie = apiClient.getSessionCookie();
if (!cookie.isEmpty()) {
connection.setRequestProperty("Cookie", cookie);
}
int statusCode = connection.getResponseCode();
if (statusCode < 200 || statusCode >= 300) {
throw new IOException("REALTIME_STREAM_HTTP_" + statusCode);
}
try (InputStream inputStream = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
StringBuilder block = new StringBuilder();
String line;
while (running && (line = reader.readLine()) != null) {
if (line.isEmpty()) {
dispatchEventBlock(block.toString());
block.setLength(0);
continue;
}
block.append(line).append('\n');
try {
connection.setRequestMethod("GET");
connection.setConnectTimeout(STREAM_CONNECT_TIMEOUT_MS);
connection.setReadTimeout(STREAM_READ_TIMEOUT_MS);
connection.setUseCaches(false);
connection.setDoInput(true);
connection.setRequestProperty("Accept", "text/event-stream");
connection.setRequestProperty("Cache-Control", "no-cache");
connection.setRequestProperty("x-boss-native-app", "1");
String cookie = apiClient.getSessionCookie();
if (!cookie.isEmpty()) {
connection.setRequestProperty("Cookie", cookie);
}
if (block.length() > 0) {
dispatchEventBlock(block.toString());
int statusCode = connection.getResponseCode();
if (statusCode < 200 || statusCode >= 300) {
throw new IOException("REALTIME_STREAM_HTTP_" + statusCode);
}
Log.i(TAG, "Realtime stream connected");
try (InputStream inputStream = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
StringBuilder block = new StringBuilder();
String line;
while (running && (line = reader.readLine()) != null) {
if (line.isEmpty()) {
dispatchEventBlock(block.toString());
block.setLength(0);
continue;
}
block.append(line).append('\n');
}
if (block.length() > 0) {
dispatchEventBlock(block.toString());
}
if (running) {
Log.w(TAG, "Realtime stream ended; reopening");
}
}
} finally {
activeConnection = null;
@@ -117,6 +150,12 @@ final class BossRealtimeClient {
}
}
private boolean shouldAttemptSessionRestore(Exception error) {
return error instanceof IOException
&& REALTIME_STREAM_HTTP_401.equals(error.getMessage())
&& apiClient.hasRestoreToken();
}
private void dispatchEventBlock(String rawBlock) {
BossRealtimeEvent event = parseEventBlock(rawBlock);
if (event == null || event.eventName.isEmpty()) {

View File

@@ -0,0 +1,68 @@
import test from "node:test";
import assert from "node:assert/strict";
import { readFile } from "node:fs/promises";
const realtimeClientSourceUrl = new URL(
"../android/app/src/main/java/com/hyzq/boss/BossRealtimeClient.java",
import.meta.url,
);
async function readRealtimeClientSource() {
return readFile(realtimeClientSourceUrl, "utf8");
}
test("android realtime client restores session when the SSE stream returns 401", async () => {
const source = await readRealtimeClientSource();
assert.match(
source,
/REALTIME_STREAM_HTTP_401/,
"expected realtime client to classify 401 stream failures explicitly",
);
assert.match(
source,
/catch\s*\(Exception\s+\w+\)\s*\{[\s\S]{0,800}apiClient\.restoreSession\(\)/,
"expected realtime loop to attempt restoreSession before falling back to blind retry",
);
});
test("android realtime client does not wait a full minute before reconnecting a silent stream", async () => {
const source = await readRealtimeClientSource();
assert.doesNotMatch(
source,
/setReadTimeout\(60_000\)/,
"a 60s SSE read timeout makes message delivery feel stale after transient network stalls",
);
assert.match(
source,
/STREAM_READ_TIMEOUT_MS\s*=\s*(25_000|30_000|35_000)/,
"expected the SSE read timeout budget to stay close to the server keepalive window",
);
assert.match(
source,
/setReadTimeout\(STREAM_READ_TIMEOUT_MS\)/,
"expected the realtime stream to apply the tighter read timeout budget",
);
});
test("android realtime client emits connection logs for reconnect debugging", async () => {
const source = await readRealtimeClientSource();
assert.match(source, /android\.util\.Log/, "expected realtime client to import Android logging");
assert.match(
source,
/Log\.(i|w|e)\(TAG,\s*"/,
"expected realtime client to log connection state changes for field debugging",
);
});
test("android realtime client always clears the active connection after stream setup failures", async () => {
const source = await readRealtimeClientSource();
assert.match(
source,
/try\s*\{[\s\S]{0,1200}int statusCode = connection\.getResponseCode\(\);[\s\S]{0,220}throw new IOException\("REALTIME_STREAM_HTTP_" \+ statusCode\);[\s\S]{0,2000}finally\s*\{\s*activeConnection = null;\s*connection\.disconnect\(\);\s*\}/,
"expected non-2xx SSE setup failures to still flow through the connection cleanup finally block",
);
});