123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- use std::collections::{HashMap, HashSet};
- use std::{io};
- use std::convert::TryFrom;
- use std::fs::File;
- use std::future::Future;
- use std::path::{PathBuf};
- use std::io::{BufReader, BufWriter, ErrorKind, Read, Write};
- use std::sync::Arc;
- use std::time::Duration;
- use anyhow::anyhow;
- use futures::{StreamExt, TryStreamExt};
- use kuchiki::iter::NodeIterator;
- use serde_json::to_writer;
- use tokio::net::TcpStream;
- use tokio_rustls::client::TlsStream;
- use tokio_rustls::rustls::{ClientConfig, RootCertStore};
- use tokio_rustls::rustls::pki_types::ServerName;
- use crate::config::Config;
- use crate::{add_email};
- use tokio::task;
- use tokio::time::sleep;
- #[cfg(not(target_os = "wasi"))]
- use async_imap::{Client, Session};
- #[cfg(not(target_os = "wasi"))]
- use async_imap::extensions::idle::IdleResponse::NewData;
- #[cfg(target_os = "wasi")]
- use async_imap_wasi::{Client, Session};
- #[cfg(target_os = "wasi")]
- use async_imap_wasi::extensions::idle::IdleResponse::NewData;
- /// create TLS connect with the IMAP server
- pub async fn connect_to_imap() -> anyhow::Result<Client<TlsStream<TcpStream>>>{
- let mut root_cert_store = RootCertStore::empty();
- root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
- let config = ClientConfig::builder()
- .with_root_certificates(root_cert_store)
- .with_no_client_auth();
- let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
- let dnsname = ServerName::try_from(Config::global().imap_domain.clone()).unwrap();
- let stream = TcpStream::connect(format!("{}:{}", Config::global().imap_domain.clone(), Config::global().imap_port.clone())).await?;
- let stream = connector.connect(dnsname, stream).await?;
-
- #[cfg(not(target_os = "wasi"))]
- let client = async_imap::Client::new(stream);
- #[cfg(target_os = "wasi")]
- let client = async_imap_wasi::Client::new(stream);
-
- Ok(client)
- }
- /// download all emails from the server and persist them
- pub async fn download_email_from_imap() -> anyhow::Result<Vec<(u32, PathBuf)>>{
- let mut client = connect_to_imap().await?;
- let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
- let mut session = match session_result {
- Ok(session) => {session}
- Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
- };
-
- let mut stored_paths: Vec<(u32, PathBuf)> = vec![];
- match list_imap_folders(&mut session).await {
- Ok(folders) => {
- for folder in folders {
- println!("Start downloading {}", folder.clone());
- let new_paths = match fetch_and_store_emails(&mut session, folder.clone()).await {
- Ok(paths) => {
- println!("Emails fetched and stored successfully for {}.", folder);
- paths
- },
- Err(err) => {
- eprintln!("Error during email fetching and storing: {}", err);
- vec![]
- }
- };
- stored_paths.extend(new_paths);
- }
- }
- Err(_) => {return Err(anyhow!("Unable to retrieve folders from IMAP server"))}
- }
-
- // Logout from the IMAP session
- session.logout().await.expect("Unable to logout");
- Ok(stored_paths)
- }
- /// return Vec with all mailboxes
- pub async fn list_imap_folders(session: &mut Session<TlsStream<TcpStream>>) -> anyhow::Result<Vec<String>> {
- let mut folders: Vec<String> = vec![];
- let mut folders_stream = session.list(None, Some("*")).await?;
- while let Some(folder_result) = folders_stream.next().await {
- match folder_result {
- Ok(folder) => {
- folders.push(folder.name().to_string())
- }
- Err(_) => {}
- }
- }
- Ok(folders)
- }
- /// download all emails from one mailbox
- pub async fn fetch_and_store_emails(session: &mut Session<TlsStream<TcpStream>>, list: String) -> anyhow::Result<Vec<(u32, PathBuf)>> {
- session.select(list.clone()).await?;
- // Create directories for maildir
- std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("new"))
- .expect(&*("Unable to create 'new' directory in ".to_owned() + &*list.clone()));
- std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("cur"))
- .expect(&*("Unable to create 'cur' directory in ".to_owned() + &*list.clone()));
- std::fs::create_dir_all(Config::global().maildir.clone().join(list.clone()).join("tmp"))
- .expect(&*("Unable to create 'tmp' directory in ".to_owned() + &*list.clone()));
- let uids_path = Config::global().maildir.clone().join(".uids.json");
- let uids_server = session.uid_search("ALL").await.unwrap();
- let uids_local = read_local_uids(uids_path.clone()).unwrap();
- let mut stored_paths: Vec<(u32, PathBuf)> = vec![];
- for uid in uids_server {
- if let Some(uids) = uids_local.get(&list) {
- if uids.contains(&uid.to_string()) {
- continue;
- }
- }
- let mut messages_stream = session.uid_fetch(uid.to_string(), "(UID BODY[])").await?;
- let mut message = None;
- while let Some(message_result) = messages_stream.next().await {
- match message_result {
- Ok(message_ok) => {
- message = Some(message_ok);
- }
- Err(_) => {
- return Err(anyhow!("Unable to retrieve message"));
- }
- }
- }
- if let Some(msg) = &message {
- if let Some(body) = msg.body() {
- let mail_file = store(Config::global().maildir.clone().join(list.clone()), uid.clone().to_string(), "new".to_string(), body, "");
- match mail_file {
- Ok(file) => stored_paths.push((uid.to_string().parse().unwrap(), file)),
- Err(e) => eprintln!("Failed to store email: {}", e),
- }
- } else {
- eprintln!("Failed to retrieve email body");
- }
- if let Some(uid) = msg.uid {
- update_local_uids(uid.to_string(), list.clone(), uids_path.clone())
- .expect("Cannot write uid to .uids.json");
- }
- println!("Downloaded message with UID {}", uid.to_string());
- } else {
- return Err(anyhow!("No message found with the given UID"));
- }
- }
- Ok(stored_paths)
- }
- /// read uids that have been already downloaded
- fn read_local_uids(uids_path: PathBuf) -> anyhow::Result<HashMap<String, HashSet<String>>> {
- let file = match File::open(&uids_path) {
- Ok(file) => file,
- Err(_) => return Ok(HashMap::new()), // Propagate other errors
- };
- let mut reader = BufReader::new(file);
- let mut content = String::new();
- reader.read_to_string(&mut content)?;
- if content.trim().is_empty() {
- // Return an empty HashMap if the file is empty
- return Ok(HashMap::new());
- }
- // Deserialize the JSON into a temporary structure
- let data: HashMap<String, Vec<String>> = match serde_json::from_str(&content) {
- Ok(data) => data,
- Err(e) => return Err(anyhow!("Error deserializing local uids")), // Handle JSON parsing errors
- };
- // Convert Vec<String> to HashSet<String>
- let mut result: HashMap<String, HashSet<String>> = HashMap::new();
- for (key, value) in data {
- result.insert(key, value.into_iter().collect());
- }
- Ok(result)
- }
- /// add uid to the local list of downloaded uids
- fn update_local_uids(uid: String, list: String, uids_path: PathBuf) -> anyhow::Result<()> {
- let file = std::fs::OpenOptions::new()
- .write(true)
- .create(true)
- .open(uids_path.clone())?;
- let mut data = read_local_uids(uids_path.clone())?;
- data.entry(list)
- .or_insert_with(HashSet::new)
- .insert(uid);
- let writer = BufWriter::new(file);
- // Convert HashSet<String> back to Vec<String> for serialization
- let mut temp_data: HashMap<String, Vec<String>> = HashMap::new();
- for (key, value) in data {
- temp_data.insert(key.clone(), value.iter().cloned().collect());
- }
- // Serialize the data back to JSON and write to file
- to_writer(writer, &temp_data)?;
-
- Ok(())
- }
- /// delete single message remotely by uid (marks deleted and deletes from the server at the same time)
- pub async fn delete_email_by_uid(list: String, uid: u32) -> anyhow::Result<()>{
- // TODO split to mark_deleted() and expunge()
- let mut client = connect_to_imap().await?;
- let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
- let mut session = match session_result {
- Ok(session) => {session}
- Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
- };
-
- session.select(list.clone()).await?;
- let updates_stream = session.store(format!("{}", uid), "+FLAGS (\\Deleted)").await?;
- let _updates: Vec<_> = updates_stream.try_collect().await?;
- let _stream = session.expunge().await?;
- drop(_stream);
- session.logout().await?;
- Ok(())
- }
- /// create a folder remotely
- pub async fn create_folder(name: String) -> anyhow::Result<()>{
- let mut client = connect_to_imap().await?;
- let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
- let mut session = match session_result {
- Ok(session) => {session}
- Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
- };
- session.create(name.clone()).await?;
- session.logout().await?;
- Ok(())
- }
- /// rename a folder remotely
- pub async fn rename_folder(name: String, new_name: String) -> anyhow::Result<()>{
- let mut client = connect_to_imap().await?;
- let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
- let mut session = match session_result {
- Ok(session) => {session}
- Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
- };
- session.rename(name.clone(), new_name.clone()).await?;
- session.logout().await?;
- Ok(())
- }
- /// delete a folder remotely
- pub async fn delete_folder(name: String) -> anyhow::Result<()>{
- let mut client = connect_to_imap().await?;
- let mut session_result = client.login(Config::global().username.clone(), Config::global().password.clone()).await;
- let mut session = match session_result {
- Ok(session) => {session}
- Err(_) => {return Err(anyhow!("Unable to login to IMAP server"))}
- };
- session.delete(name.clone()).await?;
- session.logout().await?;
- Ok(())
- }
- /// run a async task to monitor any changes in the mailbox
- pub async fn check_for_updates(mailbox: String) -> anyhow::Result<()> {
- task::spawn(async move {
- let mut client = match connect_to_imap().await {
- Ok(client) => client,
- Err(e) => {
- return Err::<(), anyhow::Error>(anyhow!("Failed to connect to IMAP"));
- }
- };
- let session_result = client
- .login(Config::global().username.clone(), Config::global().password.clone())
- .await;
- let mut session = match session_result {
- Ok(session) => session,
- Err(_) => return Err(anyhow!("Unable to login to IMAP server")),
- };
- session.select(mailbox.clone()).await?;
- loop {
- let mut idle = session.idle();
- idle.init().await.unwrap();
- let (idle_wait, interrupt) = idle.wait();
- let idle_handle = task::spawn(async move {
- println!("IDLE: waiting for 30s"); // TODO remove debug prints
- sleep(Duration::from_secs(30)).await;
- println!("IDLE: waited 30 secs, now interrupting idle");
- drop(interrupt);
- });
- match idle_wait.await.unwrap() {
- // TODO add more cases like delete, move...
- NewData(data) => {
- // TODO do not update all emails (IMAP returns * {number} RECENT) and do it only for one mailbox
- let new_paths = download_email_from_imap().await.expect("Cannot download new emails");
- for (uid, path) in new_paths.clone() {
- match add_email(path.clone(), uid.clone()){
- Ok(_) => {}
- Err(_) => {println!("Error adding email from {:?}", path.clone())}
- };
- }
- }
- reason => {
- println!("IDLE failed {:?}", reason);
- }
- }
- // Ensure the idle handle is dropped before the next loop iteration
- idle_handle.await.unwrap();
- // Reassign session to prevent ownership issues in the next loop iteration
- session = idle.done().await.unwrap();
- }
- });
- Ok(())
- }
- /// saves a raw email properly
- fn store(path: PathBuf, uid: String, subfolder: String, data: &[u8], info: &str) -> io::Result<PathBuf> {
- // loop when conflicting filenames occur, as described at
- // http://www.courier-mta.org/maildir.html
- // this assumes that pid and hostname don't change.
- let mut tmppath = path.clone();
- tmppath.push("tmp");
- let mut file;
- loop {
- tmppath.push(format!("email_{uid}"));
- match std::fs::OpenOptions::new()
- .write(true)
- .create_new(true)
- .open(&tmppath)
- {
- Ok(f) => {
- file = f;
- break;
- }
- Err(err) => {
- if err.kind() != ErrorKind::AlreadyExists {
- return Err(err.into());
- }
- tmppath.pop();
- }
- }
- }
- /// At this point, `file` is our new file at `tmppath`.
- /// If we leave the scope of this function prior to
- /// successfully writing the file to its final location,
- /// we need to ensure that we remove the temporary file.
- /// This struct takes care of that detail.
- struct UnlinkOnError {
- path_to_unlink: Option<PathBuf>,
- }
- impl Drop for UnlinkOnError {
- fn drop(&mut self) {
- if let Some(path) = self.path_to_unlink.take() {
- // Best effort to remove it
- std::fs::remove_file(path).ok();
- }
- }
- }
- // Ensure that we remove the temporary file on failure
- let mut unlink_guard = UnlinkOnError {
- path_to_unlink: Some(tmppath.clone()),
- };
- file.write_all(data)?;
- file.sync_all()?;
- let mut newpath = path.clone();
- newpath.push(subfolder);
- let id = format!("email_{uid}");
- newpath.push(format!("{}{}", id, info));
- std::fs::rename(&tmppath, &newpath)?;
- unlink_guard.path_to_unlink.take();
- Ok(newpath)
- }
|