Upwork-Jobs-scraper-/upwork/upworkPipeline.go

154 lines
3.6 KiB
Go
Raw Permalink Normal View History

2024-08-03 22:53:13 +05:00
package upwork
import (
"encoding/json"
"fmt"
"log"
"os"
"time"
)
type UpworkPipeLine struct {
upworkClient *Upwork
}
func InitPipeline() *UpworkPipeLine {
return &UpworkPipeLine{
upworkClient: InitUpwork(),
}
}
func (u *UpworkPipeLine) appendToJSONL(data []interface{}, filename string) error {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
for _, item := range data {
jsonLine, err := json.Marshal(item)
if err != nil {
return err
}
if _, err := file.Write(append(jsonLine, '\n')); err != nil {
return err
}
}
return nil
}
func (u *UpworkPipeLine) isResponseValid(data string) bool {
var resp map[string]interface{}
err := json.Unmarshal([]byte(data), &resp)
if err != nil {
return false
}
_, hasErrors := resp["errors"]
return !hasErrors
}
func (u *UpworkPipeLine) getTotalDocuments(query string, variables map[string]interface{}) (int, error) {
resp, err := u.upworkClient.SendRequest(query, variables)
if err != nil {
return 0, err
}
if !u.isResponseValid(resp) {
return 0, fmt.Errorf("invalid response")
}
var graphQLResp map[string]interface{}
err = json.Unmarshal([]byte(resp), &graphQLResp)
if err != nil {
return 0, err
}
paging, ok := graphQLResp["data"].(map[string]interface{})["search"].(map[string]interface{})["universalSearchNuxt"].(map[string]interface{})["userJobSearchV1"].(map[string]interface{})["paging"].(map[string]interface{})
if !ok {
return 0, fmt.Errorf("unexpected response structure")
}
total, ok := paging["total"].(float64)
if !ok {
return 0, fmt.Errorf("total is not a number")
}
return int(total), nil
}
func (u *UpworkPipeLine) handleRequest(query string, variables map[string]interface{}, filename string) error {
resp, err := u.upworkClient.SendRequest(query, variables)
if err != nil {
return err
}
if !u.isResponseValid(resp) {
//print the response
fmt.Println(resp)
return fmt.Errorf("invalid response returned")
}
var graphQLResp map[string]interface{}
err = json.Unmarshal([]byte(resp), &graphQLResp)
if err != nil {
return err
}
results, ok := graphQLResp["data"].(map[string]interface{})["search"].(map[string]interface{})["universalSearchNuxt"].(map[string]interface{})["userJobSearchV1"].(map[string]interface{})["results"].([]interface{})
if !ok {
return fmt.Errorf("unexpected response structure")
}
return u.appendToJSONL(results, filename)
}
func (u *UpworkPipeLine) Run(userQuery string) error {
query, err := readGraphQLQuery("JobSearchQuery.gql")
if err != nil {
return err
}
variables := map[string]interface{}{
"requestVariables": map[string]interface{}{
"userQuery": userQuery,
"sort": "recency",
"highlight": true,
"paging": map[string]interface{}{
"offset": 0,
"count": 50,
},
},
}
total_docs, err := u.getTotalDocuments(query, variables)
if err != nil {
return err
}
fmt.Printf("%s has a total of %d jobs\n", userQuery, total_docs)
iterations := total_docs / 50
// if iterations > 100 {
// iterations = 100
// }
fmt.Printf("A total of %d iterations will be performed\n", iterations)
now := time.Now()
dateString := now.Format("2006-01-02")
filename := fmt.Sprintf("upwork_jobs_%s.jsonl", dateString)
for i := 0; i < iterations; i++ {
fmt.Printf("Processing iteration %d of %d\n", i+1, iterations)
variables["requestVariables"].(map[string]interface{})["paging"].(map[string]interface{})["offset"] = i * 50
err := u.handleRequest(query, variables, filename)
if err != nil {
log.Printf("Error in iteration %d: %v", i+1, err)
}
}
fmt.Printf("Job data has been written to %s\n", filename)
return nil
}