fix: easier import proxies

This commit is contained in:
sajadMRjl
2026-01-28 15:09:13 +03:30
parent 644e135464
commit a022ea60ea
7 changed files with 214 additions and 88 deletions

View File

@@ -23,44 +23,57 @@ func main() {
cfg := config.Load()
logger.Setup(cfg.LogLevel)
// Graceful Shutdown Channel
// CLI Argument Override
// Usage: ./find-me-internet [OPTIONAL_INPUT_SOURCE]
if len(os.Args) > 1 {
cfg.InputPath = os.Args[1]
slog.Info("input_source_overridden", "source", cfg.InputPath)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 2. Services
geoDB, err := geoip.Open(cfg.GeoIPPath)
if err != nil {
slog.Warn("geoip_db_missing", "error", err, "msg", "Countries will be marked N/A")
slog.Warn("geoip_db_missing", "error", err)
} else {
defer geoDB.Close()
}
resultsWriter, err := sink.NewJSONL(cfg.OutputPath)
jsonWriter, err := sink.NewJSONL(cfg.OutputPath)
if err != nil {
slog.Error("cannot_create_output_file", "error", err)
slog.Error("cannot_create_json_output", "error", err)
os.Exit(1)
}
defer resultsWriter.Close()
defer jsonWriter.Close()
txtWriter, err := sink.NewText(cfg.TxtOutputPath)
if err != nil {
slog.Error("cannot_create_txt_output", "error", err)
os.Exit(1)
}
defer txtWriter.Close()
deduplicator := dedup.New()
netFilter := filter.NewPipeline(cfg.TcpTimeout)
boxRunner := tester.NewRunner(cfg.SingBoxPath, cfg.TestURL, cfg.TestTimeout)
// 3. Input Stream
linkStream, err := source.LoadFromFile(cfg.InputPath)
// 3. Input Stream (Smart Load)
// Supports both http://... and ./path/to/file.txt
linkStream, err := source.Load(cfg.InputPath)
if err != nil {
slog.Error("input_source_failed", "error", err)
slog.Error("input_source_failed", "error", err, "path", cfg.InputPath)
os.Exit(1)
}
// 4. Worker Pool & Counters
// 4. Worker Pool
var wg sync.WaitGroup
semaphore := make(chan struct{}, cfg.Workers)
// Counters for final stats
countProcessed := 0
countValid := 0
var mu sync.Mutex // Protects countValid from race conditions
var mu sync.Mutex
slog.Info("pipeline_started", "workers", cfg.Workers)
@@ -72,70 +85,57 @@ loop:
slog.Info("shutdown_signal_received", "msg", "finishing pending jobs...")
break loop
default:
// Continue
}
wg.Add(1)
go func(raw string) {
defer wg.Done()
// --- STAGE 1: PARSE ---
// A. Parse
proxy, err := parser.ParseLink(raw)
if err != nil {
return
}
if err != nil { return }
// --- STAGE 2: DEDUP ---
if deduplicator.Seen(proxy.Address, proxy.Port) {
return // Skip duplicates silently
}
// B. Dedup
if deduplicator.Seen(proxy.Address, proxy.Port) { return }
// --- STAGE 3: FILTER ---
if !netFilter.Check(proxy) {
return
}
// C. Filter
if !netFilter.Check(proxy) { return }
// --- STAGE 4: TEST ---
semaphore <- struct{}{} // Rate limit expensive tests
// D. Test
semaphore <- struct{}{}
err = boxRunner.Test(proxy)
<-semaphore
if err != nil {
return
}
if err != nil { return }
// --- STAGE 5: ENRICH ---
// E. Enrich
if geoDB != nil {
proxy.Country = geoDB.Lookup(proxy.Address)
}
// --- STAGE 6: SAVE ---
if err := resultsWriter.Write(proxy); err != nil {
slog.Error("write_failed", "error", err)
} else {
// Thread-Safe Increment
mu.Lock()
countValid++
mu.Unlock()
// F. Save
jsonWriter.Write(proxy)
txtWriter.Write(proxy)
slog.Info("proxy_saved",
"country", proxy.Country,
"latency", proxy.Latency.Milliseconds(),
"type", proxy.Type,
)
}
// Stats
mu.Lock()
countValid++
mu.Unlock()
slog.Info("proxy_saved",
"country", proxy.Country,
"latency", proxy.Latency.Milliseconds(),
"type", proxy.Type,
)
}(rawLink)
countProcessed++
if countProcessed % 1000 == 0 {
slog.Info("progress_report", "processed", countProcessed, "valid_so_far", countValid)
slog.Info("progress_report", "processed", countProcessed, "valid", countValid)
}
}
wg.Wait()
slog.Info("scan_finished",
"total_processed", countProcessed,
"total_valid", countValid,
)
slog.Info("scan_finished", "total_processed", countProcessed, "total_valid", countValid)
}