init
This commit is contained in:
111
app/api/v1/heartbeat/route.ts
Normal file
111
app/api/v1/heartbeat/route.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters, heartbeats } from "@/lib/db/schema";
|
||||
import {
|
||||
setLobsterOnline,
|
||||
updateActiveLobsters,
|
||||
incrementHourlyActivity,
|
||||
publishEvent,
|
||||
} from "@/lib/redis";
|
||||
import { validateApiKey } from "@/lib/auth/api-key";
|
||||
import { getGeoLocation } from "@/lib/geo/ip-location";
|
||||
import { heartbeatSchema } from "@/lib/validators/schemas";
|
||||
|
||||
function getClientIp(req: NextRequest): string {
|
||||
const forwarded = req.headers.get("x-forwarded-for");
|
||||
if (forwarded) return forwarded.split(",")[0].trim();
|
||||
const realIp = req.headers.get("x-real-ip");
|
||||
if (realIp) return realIp.trim();
|
||||
return "127.0.0.1";
|
||||
}
|
||||
|
||||
export async function POST(req: NextRequest) {
|
||||
try {
|
||||
const authHeader = req.headers.get("authorization");
|
||||
if (!authHeader?.startsWith("Bearer ")) {
|
||||
return NextResponse.json(
|
||||
{ error: "Missing or invalid authorization header" },
|
||||
{ status: 401 }
|
||||
);
|
||||
}
|
||||
|
||||
const apiKey = authHeader.slice(7);
|
||||
const lobster = await validateApiKey(apiKey);
|
||||
if (!lobster) {
|
||||
return NextResponse.json({ error: "Invalid API key" }, { status: 401 });
|
||||
}
|
||||
|
||||
let bodyData = {};
|
||||
try {
|
||||
const rawBody = await req.text();
|
||||
if (rawBody) bodyData = JSON.parse(rawBody);
|
||||
} catch {
|
||||
// Empty body is fine for heartbeat
|
||||
}
|
||||
|
||||
const parsed = heartbeatSchema.safeParse(bodyData);
|
||||
if (!parsed.success) {
|
||||
return NextResponse.json(
|
||||
{ error: "Validation failed", details: parsed.error.flatten() },
|
||||
{ status: 400 }
|
||||
);
|
||||
}
|
||||
|
||||
const clientIp = getClientIp(req);
|
||||
const now = new Date();
|
||||
|
||||
const updateFields: Record<string, unknown> = {
|
||||
lastHeartbeat: now,
|
||||
ip: clientIp,
|
||||
updatedAt: now,
|
||||
};
|
||||
|
||||
if (clientIp !== lobster.ip) {
|
||||
const geo = await getGeoLocation(clientIp);
|
||||
if (geo) {
|
||||
updateFields.latitude = String(geo.latitude);
|
||||
updateFields.longitude = String(geo.longitude);
|
||||
updateFields.city = geo.city;
|
||||
updateFields.country = geo.country;
|
||||
updateFields.countryCode = geo.countryCode;
|
||||
updateFields.region = geo.region;
|
||||
}
|
||||
}
|
||||
|
||||
if (parsed.data.name) updateFields.name = parsed.data.name;
|
||||
if (parsed.data.model) updateFields.model = parsed.data.model;
|
||||
if (parsed.data.platform) updateFields.platform = parsed.data.platform;
|
||||
|
||||
await setLobsterOnline(lobster.id, clientIp);
|
||||
await updateActiveLobsters(lobster.id);
|
||||
await incrementHourlyActivity();
|
||||
|
||||
await db
|
||||
.update(lobsters)
|
||||
.set(updateFields)
|
||||
.where(eq(lobsters.id, lobster.id));
|
||||
|
||||
// Insert heartbeat record asynchronously
|
||||
db.insert(heartbeats)
|
||||
.values({ lobsterId: lobster.id, ip: clientIp, timestamp: now })
|
||||
.then(() => {})
|
||||
.catch((err: unknown) => console.error("Failed to insert heartbeat:", err));
|
||||
|
||||
await publishEvent({
|
||||
type: "heartbeat",
|
||||
lobsterId: lobster.id,
|
||||
lobsterName: (updateFields.name as string) ?? lobster.name,
|
||||
city: (updateFields.city as string) ?? lobster.city,
|
||||
country: (updateFields.country as string) ?? lobster.country,
|
||||
});
|
||||
|
||||
return NextResponse.json({ ok: true, nextIn: 180 });
|
||||
} catch (error) {
|
||||
console.error("Heartbeat error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
61
app/api/v1/heatmap/route.ts
Normal file
61
app/api/v1/heatmap/route.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { NextResponse } from "next/server";
|
||||
import { gte, and, isNotNull, sql } from "drizzle-orm";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters } from "@/lib/db/schema";
|
||||
import { getCacheHeatmap, setCacheHeatmap } from "@/lib/redis";
|
||||
|
||||
export async function GET() {
|
||||
try {
|
||||
const cached = await getCacheHeatmap();
|
||||
if (cached) {
|
||||
return NextResponse.json(JSON.parse(cached));
|
||||
}
|
||||
|
||||
const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000);
|
||||
|
||||
const activeLobsters = await db
|
||||
.select({
|
||||
city: lobsters.city,
|
||||
country: lobsters.country,
|
||||
latitude: lobsters.latitude,
|
||||
longitude: lobsters.longitude,
|
||||
count: sql<number>`count(*)`,
|
||||
})
|
||||
.from(lobsters)
|
||||
.where(
|
||||
and(
|
||||
gte(lobsters.lastHeartbeat, fiveMinutesAgo),
|
||||
isNotNull(lobsters.latitude),
|
||||
isNotNull(lobsters.longitude)
|
||||
)
|
||||
)
|
||||
.groupBy(
|
||||
lobsters.city,
|
||||
lobsters.country,
|
||||
lobsters.latitude,
|
||||
lobsters.longitude
|
||||
);
|
||||
|
||||
const points = activeLobsters.map((row) => ({
|
||||
lat: Number(row.latitude),
|
||||
lng: Number(row.longitude),
|
||||
weight: row.count,
|
||||
lobsterCount: row.count,
|
||||
city: row.city,
|
||||
country: row.country,
|
||||
}));
|
||||
|
||||
const lastUpdated = new Date().toISOString();
|
||||
const responseData = { points, lastUpdated };
|
||||
|
||||
await setCacheHeatmap(JSON.stringify(responseData));
|
||||
|
||||
return NextResponse.json(responseData);
|
||||
} catch (error) {
|
||||
console.error("Heatmap error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
73
app/api/v1/lobsters/route.ts
Normal file
73
app/api/v1/lobsters/route.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
import { eq, desc, sql, and } from "drizzle-orm";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters, tasks } from "@/lib/db/schema";
|
||||
import { getActiveLobsterIds } from "@/lib/redis";
|
||||
|
||||
export async function GET(req: NextRequest) {
|
||||
try {
|
||||
const { searchParams } = new URL(req.url);
|
||||
const limitParam = parseInt(searchParams.get("limit") ?? "50", 10);
|
||||
const limit = Math.min(Math.max(1, limitParam), 200);
|
||||
const region = searchParams.get("region");
|
||||
|
||||
const conditions = [];
|
||||
if (region) {
|
||||
conditions.push(eq(lobsters.region, region));
|
||||
}
|
||||
|
||||
const whereClause = conditions.length > 0 ? and(...conditions) : undefined;
|
||||
|
||||
const lobsterRows = await db
|
||||
.select()
|
||||
.from(lobsters)
|
||||
.where(whereClause)
|
||||
.orderBy(desc(lobsters.lastHeartbeat))
|
||||
.limit(limit);
|
||||
|
||||
const totalResult = await db
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
.from(lobsters)
|
||||
.where(whereClause);
|
||||
const total = totalResult[0]?.count ?? 0;
|
||||
|
||||
const activeLobsterIds = await getActiveLobsterIds();
|
||||
const activeSet = new Set(activeLobsterIds);
|
||||
|
||||
const lobsterList = await Promise.all(
|
||||
lobsterRows.map(async (lobster) => {
|
||||
const latestTaskRows = await db
|
||||
.select({
|
||||
summary: tasks.summary,
|
||||
timestamp: tasks.timestamp,
|
||||
durationMs: tasks.durationMs,
|
||||
})
|
||||
.from(tasks)
|
||||
.where(eq(tasks.lobsterId, lobster.id))
|
||||
.orderBy(desc(tasks.timestamp))
|
||||
.limit(1);
|
||||
|
||||
const lastTask = latestTaskRows[0] ?? null;
|
||||
|
||||
return {
|
||||
id: lobster.id,
|
||||
name: lobster.name,
|
||||
model: lobster.model,
|
||||
platform: lobster.platform,
|
||||
city: lobster.city,
|
||||
country: lobster.country,
|
||||
isOnline: activeSet.has(lobster.id),
|
||||
lastTask,
|
||||
};
|
||||
})
|
||||
);
|
||||
|
||||
return NextResponse.json({ lobsters: lobsterList, total });
|
||||
} catch (error) {
|
||||
console.error("Lobsters error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
89
app/api/v1/register/route.ts
Normal file
89
app/api/v1/register/route.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
import { nanoid } from "nanoid";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters } from "@/lib/db/schema";
|
||||
import {
|
||||
setLobsterOnline,
|
||||
updateActiveLobsters,
|
||||
incrementGlobalStat,
|
||||
incrementRegionCount,
|
||||
publishEvent,
|
||||
} from "@/lib/redis";
|
||||
import { generateApiKey } from "@/lib/auth/api-key";
|
||||
import { getGeoLocation } from "@/lib/geo/ip-location";
|
||||
import { registerSchema } from "@/lib/validators/schemas";
|
||||
|
||||
function getClientIp(req: NextRequest): string {
|
||||
const forwarded = req.headers.get("x-forwarded-for");
|
||||
if (forwarded) return forwarded.split(",")[0].trim();
|
||||
const realIp = req.headers.get("x-real-ip");
|
||||
if (realIp) return realIp.trim();
|
||||
return "127.0.0.1";
|
||||
}
|
||||
|
||||
export async function POST(req: NextRequest) {
|
||||
try {
|
||||
const body = await req.json();
|
||||
const parsed = registerSchema.safeParse(body);
|
||||
if (!parsed.success) {
|
||||
return NextResponse.json(
|
||||
{ error: "Validation failed", details: parsed.error.flatten() },
|
||||
{ status: 400 }
|
||||
);
|
||||
}
|
||||
|
||||
const { name, model, platform } = parsed.data;
|
||||
const lobsterId = nanoid(21);
|
||||
const apiKey = generateApiKey();
|
||||
const clientIp = getClientIp(req);
|
||||
const geo = await getGeoLocation(clientIp);
|
||||
const now = new Date();
|
||||
|
||||
await db.insert(lobsters).values({
|
||||
id: lobsterId,
|
||||
apiKey,
|
||||
name,
|
||||
model: model ?? null,
|
||||
platform: platform ?? null,
|
||||
ip: clientIp,
|
||||
latitude: geo ? String(geo.latitude) : null,
|
||||
longitude: geo ? String(geo.longitude) : null,
|
||||
city: geo?.city ?? null,
|
||||
country: geo?.country ?? null,
|
||||
countryCode: geo?.countryCode ?? null,
|
||||
region: geo?.region ?? null,
|
||||
lastHeartbeat: now,
|
||||
totalTasks: 0,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
});
|
||||
|
||||
await setLobsterOnline(lobsterId, clientIp);
|
||||
await updateActiveLobsters(lobsterId);
|
||||
await incrementGlobalStat("total_lobsters");
|
||||
|
||||
if (geo?.region) {
|
||||
await incrementRegionCount(geo.region);
|
||||
}
|
||||
|
||||
await publishEvent({
|
||||
type: "online",
|
||||
lobsterId,
|
||||
lobsterName: name,
|
||||
city: geo?.city ?? null,
|
||||
country: geo?.country ?? null,
|
||||
});
|
||||
|
||||
return NextResponse.json({
|
||||
lobsterId,
|
||||
apiKey,
|
||||
endpoint: `${process.env.NEXT_PUBLIC_APP_URL}/api/v1`,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Register error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
76
app/api/v1/stats/route.ts
Normal file
76
app/api/v1/stats/route.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { NextResponse } from "next/server";
|
||||
import { gte, sql } from "drizzle-orm";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters, tasks } from "@/lib/db/schema";
|
||||
import {
|
||||
redis,
|
||||
getGlobalStats,
|
||||
getRegionStats,
|
||||
getHourlyActivity,
|
||||
} from "@/lib/redis";
|
||||
|
||||
export async function GET() {
|
||||
try {
|
||||
const globalStats = await getGlobalStats();
|
||||
const regionStats = await getRegionStats();
|
||||
const hourlyRaw = await getHourlyActivity();
|
||||
|
||||
// Convert hourly data to array format
|
||||
const hourlyActivity = Object.entries(hourlyRaw).map(([hour, count]) => ({
|
||||
hour: hour.split("T")[1] + ":00",
|
||||
count,
|
||||
}));
|
||||
|
||||
const now = Date.now();
|
||||
const fiveMinutesAgo = now - 300_000;
|
||||
const activeLobsters = await redis.zcount(
|
||||
"active:lobsters",
|
||||
fiveMinutesAgo,
|
||||
"+inf"
|
||||
);
|
||||
|
||||
const totalLobstersResult = await db
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
.from(lobsters);
|
||||
const totalLobsters = totalLobstersResult[0]?.count ?? 0;
|
||||
|
||||
const todayStart = new Date();
|
||||
todayStart.setHours(0, 0, 0, 0);
|
||||
|
||||
const tasksTodayResult = await db
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
.from(tasks)
|
||||
.where(gte(tasks.timestamp, todayStart));
|
||||
const tasksToday = tasksTodayResult[0]?.count ?? 0;
|
||||
|
||||
const avgDurationResult = await db
|
||||
.select({ avg: sql<number>`AVG(${tasks.durationMs})` })
|
||||
.from(tasks)
|
||||
.where(gte(tasks.timestamp, todayStart));
|
||||
const avgTaskDuration = avgDurationResult[0]?.avg
|
||||
? Math.round(avgDurationResult[0].avg)
|
||||
: 0;
|
||||
|
||||
// Convert region stats from string values to numbers
|
||||
const regionBreakdown: Record<string, number> = {};
|
||||
for (const [key, val] of Object.entries(regionStats)) {
|
||||
regionBreakdown[key] = parseInt(val, 10) || 0;
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
totalLobsters,
|
||||
activeLobsters,
|
||||
tasksToday,
|
||||
tasksTotal: parseInt(globalStats.total_tasks ?? "0", 10),
|
||||
avgTaskDuration,
|
||||
regionBreakdown,
|
||||
hourlyActivity,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Stats error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
60
app/api/v1/stream/route.ts
Normal file
60
app/api/v1/stream/route.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import { NextRequest } from "next/server";
|
||||
import Redis from "ioredis";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
export const runtime = "nodejs";
|
||||
|
||||
export async function GET(req: NextRequest) {
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
const subscriber = new Redis(process.env.REDIS_URL!);
|
||||
|
||||
subscriber.subscribe("channel:realtime");
|
||||
|
||||
subscriber.on("message", (_channel: string, message: string) => {
|
||||
try {
|
||||
const data = JSON.parse(message);
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`event: ${data.type}\ndata: ${JSON.stringify(data)}\n\n`
|
||||
)
|
||||
);
|
||||
} catch {
|
||||
// skip malformed messages
|
||||
}
|
||||
});
|
||||
|
||||
// Send keepalive every 30 seconds
|
||||
const keepalive = setInterval(() => {
|
||||
try {
|
||||
controller.enqueue(encoder.encode(": keepalive\n\n"));
|
||||
} catch {
|
||||
clearInterval(keepalive);
|
||||
}
|
||||
}, 30000);
|
||||
|
||||
// Send initial connection event
|
||||
controller.enqueue(
|
||||
encoder.encode(`event: connected\ndata: {"status":"connected"}\n\n`)
|
||||
);
|
||||
|
||||
// Cleanup on close
|
||||
req.signal.addEventListener("abort", () => {
|
||||
clearInterval(keepalive);
|
||||
subscriber.unsubscribe("channel:realtime");
|
||||
subscriber.quit();
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
Connection: "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
});
|
||||
}
|
||||
80
app/api/v1/task/route.ts
Normal file
80
app/api/v1/task/route.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { db } from "@/lib/db";
|
||||
import { lobsters, tasks } from "@/lib/db/schema";
|
||||
import {
|
||||
incrementGlobalStat,
|
||||
incrementHourlyActivity,
|
||||
publishEvent,
|
||||
} from "@/lib/redis";
|
||||
import { validateApiKey } from "@/lib/auth/api-key";
|
||||
import { taskSchema } from "@/lib/validators/schemas";
|
||||
|
||||
export async function POST(req: NextRequest) {
|
||||
try {
|
||||
const authHeader = req.headers.get("authorization");
|
||||
if (!authHeader?.startsWith("Bearer ")) {
|
||||
return NextResponse.json(
|
||||
{ error: "Missing or invalid authorization header" },
|
||||
{ status: 401 }
|
||||
);
|
||||
}
|
||||
|
||||
const apiKey = authHeader.slice(7);
|
||||
const lobster = await validateApiKey(apiKey);
|
||||
if (!lobster) {
|
||||
return NextResponse.json({ error: "Invalid API key" }, { status: 401 });
|
||||
}
|
||||
|
||||
const body = await req.json();
|
||||
const parsed = taskSchema.safeParse(body);
|
||||
if (!parsed.success) {
|
||||
return NextResponse.json(
|
||||
{ error: "Validation failed", details: parsed.error.flatten() },
|
||||
{ status: 400 }
|
||||
);
|
||||
}
|
||||
|
||||
const { summary, durationMs, model, toolsUsed } = parsed.data;
|
||||
const now = new Date();
|
||||
|
||||
const insertResult = await db.insert(tasks).values({
|
||||
lobsterId: lobster.id,
|
||||
summary,
|
||||
durationMs,
|
||||
model: model ?? null,
|
||||
toolsUsed: toolsUsed ?? null,
|
||||
timestamp: now,
|
||||
});
|
||||
|
||||
await db
|
||||
.update(lobsters)
|
||||
.set({ totalTasks: sql`${lobsters.totalTasks} + 1`, updatedAt: now })
|
||||
.where(eq(lobsters.id, lobster.id));
|
||||
|
||||
await incrementGlobalStat("total_tasks");
|
||||
await incrementGlobalStat("tasks_today");
|
||||
await incrementHourlyActivity();
|
||||
|
||||
await publishEvent({
|
||||
type: "task",
|
||||
lobsterId: lobster.id,
|
||||
lobsterName: lobster.name,
|
||||
city: lobster.city,
|
||||
country: lobster.country,
|
||||
summary,
|
||||
durationMs,
|
||||
});
|
||||
|
||||
return NextResponse.json({
|
||||
ok: true,
|
||||
taskId: insertResult[0].insertId,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Task error:", error);
|
||||
return NextResponse.json(
|
||||
{ error: "Internal server error" },
|
||||
{ status: 500 }
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user